diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml index 25cafa543b..136e06ca18 100644 --- a/.github/workflows/gradle-wrapper-validation.yml +++ b/.github/workflows/gradle-wrapper-validation.yml @@ -9,5 +9,5 @@ jobs: name: "Validation" runs-on: ubuntu-latest steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - uses: gradle/wrapper-validation-action@56b90f209b02bf6d1deae490e9ef18b21a389cd4 # v1.1.0 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0 diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml index 0702a8de90..abe9489f67 100644 --- a/.github/workflows/gradle_branch.yml +++ b/.github/workflows/gradle_branch.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - name: Set up JDK 8 - uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2 + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 + uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml index d019589806..0769dc2a1e 100644 --- a/.github/workflows/gradle_jdk11.yml +++ b/.github/workflows/gradle_jdk11.yml @@ -12,19 +12,22 @@ on: permissions: contents: read +env: + BUILD_WITH_11: true + jobs: build: runs-on: ubuntu-latest steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 - name: Set up JDK 11 - uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0 + uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 with: distribution: 'zulu' java-version: '11' - name: Cache Gradle packages - uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2 + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -35,5 +38,5 @@ jobs: run: ./gradlew -PjavaCompatibility=9 jar - name: Build RxJava run: ./gradlew build --stacktrace - - name: Generate Javadoc - run: ./gradlew javadoc --stacktrace +# - name: Generate Javadoc +# run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml index e0f03e68c4..af19ed66f1 100644 --- a/.github/workflows/gradle_pr.yml +++ b/.github/workflows/gradle_pr.yml @@ -15,14 +15,14 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - name: Set up JDK 8 - uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2 + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }} @@ -32,6 +32,6 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace - name: Upload to Codecov - uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 + uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1 - name: Generate Javadoc run: ./gradlew javadoc --stacktrace diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml index b240e12f98..2a5d60ddb3 100644 --- a/.github/workflows/gradle_release.yml +++ b/.github/workflows/gradle_release.yml @@ -22,14 +22,14 @@ jobs: env: CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - name: Set up JDK 8 - uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2 + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -43,9 +43,18 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload to Codecov - uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 - - name: Upload release - run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace + uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1 +# - name: Upload release +# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace +# env: +# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions +# # ------------------------------------------------------------------------------ +# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} +# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} +# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} +# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} + - name: Publish release + run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ @@ -53,13 +62,6 @@ jobs: ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }} ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }} - - name: Publish release - run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace - env: - # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions - # ------------------------------------------------------------------------------ - ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} - ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Push Javadoc run: ./push_javadoc.sh env: diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml index b50829cf01..be0db04f44 100644 --- a/.github/workflows/gradle_snapshot.yml +++ b/.github/workflows/gradle_snapshot.yml @@ -21,14 +21,14 @@ jobs: # ------------------------------------------------------------------------------ CI_BUILD_NUMBER: ${{ github.run_number }} steps: - - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - - name: Set up JDK 8 - uses: actions/setup-java@387ac29b308b003ca37ba93a6cab5eb57c8f5f93 # v4.0.0 + - uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 + - name: Set up JDK 11 + uses: actions/setup-java@dded0888837ed1f317902acf8a20df0ad188d165 # v5.0.0 with: distribution: 'zulu' - java-version: '8' + java-version: '11' - name: Cache Gradle packages - uses: actions/cache@704facf57e6136b1bc63b828d79edcd491f0ee84 # v3.3.2 + uses: actions/cache@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0 with: path: ~/.gradle/caches key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }} @@ -40,14 +40,14 @@ jobs: - name: Build RxJava run: ./gradlew build --stacktrace --no-daemon - name: Upload Snapshot - run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace + run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace env: # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions # ------------------------------------------------------------------------------ ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }} ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }} - name: Upload to Codecov - uses: codecov/codecov-action@eaaf4bedf32dbdc6b720b63067d99c4d77d6047d # v3.1.4 + uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1 - name: Push Javadoc run: ./push_javadoc.sh # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml index 88e268e8b2..f19e5cc7d1 100644 --- a/.github/workflows/scorecard.yml +++ b/.github/workflows/scorecard.yml @@ -24,12 +24,12 @@ jobs: steps: - name: "Checkout code" - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + uses: actions/checkout@93cb6efe18208431cddfb8368fd83d5badbf9bfd # v5.0.1 with: persist-credentials: false - name: "Run analysis" - uses: ossf/scorecard-action@0864cf19026789058feabb7e87baa5f140aac736 # v2.3.1 + uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3 with: results_file: results.sarif results_format: sarif @@ -46,7 +46,7 @@ jobs: # Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF # format to the repository Actions tab. - name: "Upload artifact" - uses: actions/upload-artifact@c7d193f32edcb7bfad88892161225aeda64e9392 # v4.0.0 + uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 with: name: SARIF file path: results.sarif @@ -54,6 +54,6 @@ jobs: # Upload the results to GitHub's code scanning dashboard. - name: "Upload to code-scanning" - uses: github/codeql-action/upload-sarif@03e7845b7bfcd5e7fb63d1ae8c61b0e791134fab # v2.22.11 + uses: github/codeql-action/upload-sarif@e12f0178983d466f2f6028f5cc7a6d786fd97f4b # v3.29.5 with: sarif_file: results.sarif diff --git a/README.md b/README.md index a62a861dc4..5276c0bd37 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![codecov.io](http://codecov.io/github/ReactiveX/RxJava/coverage.svg?branch=3.x)](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x) -[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava) +[![Maven Central](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava) [![Contribute with Gitpod](https://img.shields.io/badge/Contribute%20with-Gitpod-908a85?logo=gitpod)](https://gitpod.io/#https://github.com/ReactiveX/RxJava) [![OpenSSF Scorecard](https://api.securityscorecards.dev/projects/github.com/ReactiveX/RxJava/badge)](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava) @@ -48,7 +48,7 @@ The first step is to include RxJava 3 into your project, for example, as a Gradl implementation "io.reactivex.rxjava3:rxjava:3.x.y" ``` -(Please replace `x` and `y` with the latest version numbers: [![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava) +(Please replace `x` and `y` with the latest version numbers: [![Maven Central](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava/badge.svg)](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava) ) ### Hello World @@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki - Google Group: [RxJava](http://groups.google.com/d/forum/rxjava) - Twitter: [@RxJava](http://twitter.com/RxJava) - [GitHub Issues](https://github.com/ReactiveX/RxJava/issues) -- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) +- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3) - [Gitter.im](https://gitter.im/ReactiveX/RxJava) ## Versioning @@ -571,11 +571,11 @@ and for Ivy: ### Snapshots -Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/ +Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/ ```groovy repositories { - maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } + maven { url 'https://central.sonatype.com/repository/maven-snapshots' } } dependencies { @@ -583,7 +583,7 @@ dependencies { } ``` -JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot +JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot ## Build @@ -618,5 +618,5 @@ For bugs, questions and discussions please use the [Github Issues](https://githu See the License for the specific language governing permissions and limitations under the License. -[beta source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/annotations/Beta.java -[experimental source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/annotations/Experimental.java +[beta source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/annotations/Beta.java +[experimental source link]: https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/annotations/Experimental.java diff --git a/build.gradle b/build.gradle index ac76e52151..8bcfddb717 100644 --- a/build.gradle +++ b/build.gradle @@ -4,12 +4,13 @@ plugins { id("eclipse") id("jacoco") id("maven-publish") - id("ru.vyarus.animalsniffer") version "1.7.1" - id("me.champeau.gradle.jmh") version "0.5.3" + id("ru.vyarus.animalsniffer") version "2.0.1" + id("me.champeau.jmh") version "0.7.3" id("com.github.hierynomus.license") version "0.16.1" id("biz.aQute.bnd.builder") version "6.4.0" - id("com.vanniktech.maven.publish") version "0.19.0" - id("org.beryx.jar") version "1.2.0" + id("com.vanniktech.maven.publish") version "0.33.0" + id("org.beryx.jar") version "2.0.0" + id("signing") } ext { @@ -18,7 +19,7 @@ ext { testNgVersion = "7.5" mockitoVersion = "4.11.0" jmhLibVersion = "1.21" - guavaVersion = "33.0.0-jre" + guavaVersion = "33.5.0-jre" } def releaseTag = System.getenv("BUILD_TAG") @@ -49,7 +50,16 @@ dependencies { testImplementation "com.google.guava:guava:$guavaVersion" } +def buildWith11 = System.getenv("BUILD_WITH_11") java { + toolchain { + vendor = JvmVendorSpec.ADOPTIUM + if ("true".equals(buildWith11)) { + languageVersion = JavaLanguageVersion.of(11) + } else { + languageVersion = JavaLanguageVersion.of(8) + } + } sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 } @@ -86,12 +96,19 @@ animalsniffer { annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer" } +moduleConfig { + moduleInfoPath = 'src/main/module/module-info.java' + multiReleaseVersion = 9 + version = project.version +} + jar { from('.') { include 'LICENSE' include 'COPYRIGHT' into('META-INF/') } + exclude("module-info.class") // Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227 bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$') @@ -106,8 +123,6 @@ jar { "Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava", "Multi-Release": "true" ) - - moduleInfoPath = 'src/main/module/module-info.java' } license { @@ -126,8 +141,8 @@ jmh { jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"] if (project.hasProperty("jmh")) { - include = [".*" + project.jmh + ".*"] - logger.info("JMH: {}", include) + includes = [".*" + project.jmh + ".*"] + logger.info("JMH: {}", includes) } } @@ -166,8 +181,9 @@ jacocoTestReport { dependsOn testNG reports { - xml.enabled = true - html.enabled = true + xml.required.set(true) + csv.required.set(false) + html.required.set(true) } } @@ -179,44 +195,25 @@ checkstyle { "checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"), "checkstyle.header.file" : project.file("config/license/HEADER_JAVA") ] + checkstyleMain.exclude '**/module-info.java' } if (project.hasProperty("releaseMode")) { logger.lifecycle("ReleaseMode: {}", project.releaseMode) - /* - if ("branch" == project.releaseMode) { - - if (version.endsWith("-SNAPSHOT")) { - publishing { - repositories { - maven { - url = "https://s01.oss.sonatype.org/content/repositories/snapshots/" - } - } - } - - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - } - } - */ if ("full" == project.releaseMode) { signing { if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) { useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD")) + sign(publishing.publications) } } - /* - mavenPublish { - nexus { - stagingProfile = "io.reactivex" - } - } - */ + } + mavenPublishing { + // or when publishing to https://central.sonatype.com/ + publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL) + + // signAllPublications() } } diff --git a/docs/Additional-Reading.md b/docs/Additional-Reading.md index 85e7d47077..4badd81308 100644 --- a/docs/Additional-Reading.md +++ b/docs/Additional-Reading.md @@ -3,7 +3,7 @@ A more complete and up-to-date list of resources can be found at the [reactivex. # Introducing Reactive Programming * [Introduction to Rx](http://www.introtorx.com/): a free, on-line book by Lee Campbell **(1.x)** * [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) by Andre Staltz -* [Mastering Observables](http://docs.couchbase.com/developer/java-2.0/observables.html) from the Couchbase documentation **(1.x)** +* [Mastering Observables](https://docs.huihoo.com/couchbase/developer-guide/java-2.0/observables.html) from the Couchbase documentation **(1.x)** * [Reactive Programming in Java 8 With RxJava](http://pluralsight.com/training/Courses/TableOfContents/reactive-programming-java-8-rxjava), a course designed by Russell Elledge **(1.x)** * [33rd Degree Reactive Java](http://www.slideshare.net/tkowalcz/33rd-degree-reactive-java) by Tomasz Kowalczewski **(1.x)** * [What Every Hipster Should Know About Functional Reactive Programming](http://www.infoq.com/presentations/game-functional-reactive-programming) - Bodil Stokke demos the creation of interactive game mechanics in RxJS diff --git a/docs/Backpressure-(2.0).md b/docs/Backpressure-(2.0).md index 61361d21c4..6b2f2860af 100644 --- a/docs/Backpressure-(2.0).md +++ b/docs/Backpressure-(2.0).md @@ -172,7 +172,7 @@ If some of the values can be safely ignored, one can use the sampling (with time } ``` -Note hovewer that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`. +Note however that these operators only reduce the rate of value reception by the downstream and thus they may still lead to `MissingBackpressureException`. ## onBackpressureBuffer() @@ -229,7 +229,7 @@ Note that the last two strategies cause discontinuity in the stream as they drop ## onBackpressureDrop() -Whenever the downstream is not ready to receive values, this operator will drop that elemenet from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`. +Whenever the downstream is not ready to receive values, this operator will drop that element from the sequence. One can think of it as a 0 capacity `onBackpressureBuffer` with strategy `ON_OVERFLOW_DROP_LATEST`. This operator is useful when one can safely ignore values from a source (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on. diff --git a/docs/Filtering-Observables.md b/docs/Filtering-Observables.md index 620800dc8c..512b69ba8a 100644 --- a/docs/Filtering-Observables.md +++ b/docs/Filtering-Observables.md @@ -259,7 +259,7 @@ firstOrError.subscribe( **ReactiveX documentation:** [http://reactivex.io/documentation/operators/ignoreelements.html](http://reactivex.io/documentation/operators/ignoreelements.html) -Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the the source. +Ignores the single item emitted by a `Single` or `Maybe` source, and returns a `Completable` that signals only the error or completion event from the source. ### ignoreElement example diff --git a/docs/Getting-Started.md b/docs/Getting-Started.md index d2ac2e4640..69b69a8ca6 100644 --- a/docs/Getting-Started.md +++ b/docs/Getting-Started.md @@ -66,18 +66,21 @@ You need Java 6 or later. ### Snapshots -Snapshots are available via [JFrog](https://oss.jfrog.org/libs-snapshot/io/reactivex/rxjava3/rxjava/): +Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/ ```groovy repositories { - maven { url 'https://oss.jfrog.org/libs-snapshot' } + maven { url 'https://oss.sonatype.org/content/repositories/snapshots' } } dependencies { - compile 'io.reactivex.rxjava3:rxjava:3.0.4' + implementation 'io.reactivex.rxjava3:rxjava:3.0.0-SNAPSHOT' } ``` +JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot + + ## Building To check out and build the RxJava source, issue the following commands: diff --git a/docs/Phantom-Operators.md b/docs/Phantom-Operators.md index b01ac28ff2..5193147a22 100644 --- a/docs/Phantom-Operators.md +++ b/docs/Phantom-Operators.md @@ -127,7 +127,7 @@ streamOfItems.flatMap(item -> { itemToObservable(item).subscribeOn(Schedulers.io()); }); ``` -Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asychronous calls. +Kick off your work for each item inside [`flatMap`](Transforming-Observables#flatmap-concatmap-and-flatmapiterable) using [`subscribeOn`](Observable-Utility-Operators#subscribeon) to make it asynchronous, or by using a function that already makes asynchronous calls. #### see also: * RxJava Threading Examples by Graham Lea diff --git a/docs/What's-different-in-2.0.md b/docs/What's-different-in-2.0.md index fac50df56d..edc681fa7f 100644 --- a/docs/What's-different-in-2.0.md +++ b/docs/What's-different-in-2.0.md @@ -450,12 +450,12 @@ Before 2.0.7, the operator `strict()` had to be applied in order to achieve the As one of the primary goals of RxJava 2, the design focuses on performance and in order enable it, RxJava 2.0.7 adds a custom `io.reactivex.FlowableSubscriber` interface (extends `org.reactivestreams.Subscriber`) but adds no new methods to it. The new interface is **constrained to RxJava 2** and represents a consumer to `Flowable` that is able to work in a mode that relaxes the Reactive-Streams version 1.0.0 specification in rules §1.3, §2.3, §2.12 and §3.9: - - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. + - §1.3 relaxation: `onSubscribe` may run concurrently with `onNext` in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. - §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation. - §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe. - §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`. -From a user's perspective, if one was using the the `subscribe` methods other than `Flowable.subscribe(Subscriber)`, there is no need to do anything regarding this change and there is no extra penalty for it. +From a user's perspective, if one was using the `subscribe` methods other than `Flowable.subscribe(Subscriber)`, there is no need to do anything regarding this change and there is no extra penalty for it. If one was using `Flowable.subscribe(Subscriber)` with the built-in RxJava `Subscriber` implementations such as `DisposableSubscriber`, `TestSubscriber` and `ResourceSubscriber`, there is a small runtime overhead (one `instanceof` check) associated when the code is not recompiled against 2.0.7. diff --git a/docs/Writing-operators-for-2.0.md b/docs/Writing-operators-for-2.0.md index e8486564b1..1a51664880 100644 --- a/docs/Writing-operators-for-2.0.md +++ b/docs/Writing-operators-for-2.0.md @@ -565,7 +565,7 @@ Version 2.0.7 introduced a new interface, `FlowableSubscriber` that extends `Sub The rule relaxations are as follows: -- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the resposibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. +- §1.3 relaxation: `onSubscribe` may run concurrently with onNext in case the `FlowableSubscriber` calls `request()` from inside `onSubscribe` and it is the responsibility of `FlowableSubscriber` to ensure thread-safety between the remaining instructions in `onSubscribe` and `onNext`. - §2.3 relaxation: calling `Subscription.cancel` and `Subscription.request` from `FlowableSubscriber.onComplete()` or `FlowableSubscriber.onError()` is considered a no-operation. - §2.12 relaxation: if the same `FlowableSubscriber` instance is subscribed to multiple sources, it must ensure its `onXXX` methods remain thread safe. - §3.9 relaxation: issuing a non-positive `request()` will not stop the current stream but signal an error via `RxJavaPlugins.onError`. diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 4e86b92707..3c44eb1b6f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java index c4a0a385a5..6e6ae7e3c6 100644 --- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java @@ -139,9 +139,9 @@ public Observable apply(Integer v) { } }); - singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() { + singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservableHide; } }); @@ -153,16 +153,16 @@ public Iterable apply(Integer v) { } }); - maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() { + maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservable; } }); - maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() { + maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return arrayObservableHide; } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java index 87ee5a07e4..4310ea2e95 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.concatMapMaybe(new Function>() { + flowableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java index 8ab19a00c6..699f76c074 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapMaybe(new Function>() { + flowableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java index d0f3730b42..f81ed10ec3 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapMaybe(new Function>() { + flowableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java index 4f50938647..5a92bf20ff 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.flatMapSingle(new Function>() { + flowableDedicated = source.flatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java index 83ad00e0f9..46ce694f6d 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapMaybe(new Function>() { + flowableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java index e36b49c4d3..e96bbc3919 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapMaybe(new Function>() { + flowableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java index 0da6941895..ef06ebfa66 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java @@ -60,9 +60,9 @@ public Publisher apply(Integer v) { } }); - flowableDedicated = source.switchMapSingle(new Function>() { + flowableDedicated = source.switchMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java index 48b20dc005..2229eed77a 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java index 4528c90b50..cfde5183e5 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - concatMapToObservableEmpty = source.concatMap(new Function>() { + concatMapToObservableEmpty = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.concatMapMaybe(new Function>() { + observableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java index 204020abfe..75e7506724 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.concatMapMaybe(new Function>() { + observableDedicated = source.concatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java index e2e34b24f5..4227791222 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.concatMap(new Function>() { + observablePlain = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.concatMap(new Function>() { + observableConvert = source.concatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.concatMapSingle(new Function>() { + observableDedicated = source.concatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java index b6daa57eb6..6a916a68f1 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java index 5d0327fa46..377a8bba93 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.flatMapMaybe(new Function>() { + observableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java index e2a7c43bea..248ca98112 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.flatMapMaybe(new Function>() { + observableDedicated = source.flatMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java index add0cd310c..880da95f5a 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.flatMap(new Function>() { + observablePlain = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.flatMap(new Function>() { + observableConvert = source.flatMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.flatMapSingle(new Function>() { + observableDedicated = source.flatMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java index 69b8e71f18..41964c3dbd 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java @@ -45,16 +45,16 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Completable.complete().toObservable(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java index 3930534eb8..6a4ea5c73b 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.empty(); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.empty().toObservable(); } }); - observableDedicated = source.switchMapMaybe(new Function>() { + observableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.empty(); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java index 30158d012d..f0c3285890 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Maybe.just(v).toObservable(); } }); - observableDedicated = source.switchMapMaybe(new Function>() { + observableDedicated = source.switchMapMaybe(new Function>() { @Override - public Maybe apply(Integer v) { + public Maybe apply(Integer v) { return Maybe.just(v); } }); diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java index 75aeb504f9..087f32c8e3 100644 --- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java +++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java @@ -45,23 +45,23 @@ public void setup() { Observable source = Observable.fromArray(sourceArray); - observablePlain = source.switchMap(new Function>() { + observablePlain = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Observable.just(v); } }); - observableConvert = source.switchMap(new Function>() { + observableConvert = source.switchMap(new Function>() { @Override - public Observable apply(Integer v) { + public Observable apply(Integer v) { return Single.just(v).toObservable(); } }); - observableDedicated = source.switchMapSingle(new Function>() { + observableDedicated = source.switchMapSingle(new Function>() { @Override - public Single apply(Integer v) { + public Single apply(Integer v) { return Single.just(v); } }); diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index fe27086f37..39f3c63b43 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -8266,7 +8266,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8299,7 +8299,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8339,7 +8339,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8372,7 +8372,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8411,7 +8411,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8456,7 +8456,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8489,7 +8489,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8529,7 +8529,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8562,7 +8562,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -8601,7 +8601,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Backpressure:
*
The operator expects the upstream to support backpressure and honors @@ -10957,6 +10957,8 @@ public final Completable flatMapCompletable(@NonNull Function + * *
*
Backpressure:
*
The operator consumes the upstream in an unbounded manner.
@@ -10980,6 +10982,8 @@ public final Completable flatMapCompletable(@NonNull Function + * *
*
Backpressure:
*
If {@code maxConcurrency == }{@link Integer#MAX_VALUE} the operator consumes the upstream in an unbounded manner. diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index 14b2795715..fcf809cdf6 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -7329,7 +7329,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
@@ -7357,7 +7357,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.
@@ -7390,7 +7390,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7418,7 +7418,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7452,7 +7452,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapMaybeDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7490,7 +7490,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
@@ -7518,7 +7518,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.
@@ -7551,7 +7551,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7579,7 +7579,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
@@ -7613,7 +7613,7 @@ public final Completable concatMapCompletableDelayError(@NonNull Function - * + * *
*
Scheduler:
*
{@code concatMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.
diff --git a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java index 580e2739cb..3aa001127a 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Scheduler.java +++ b/src/main/java/io/reactivex/rxjava3/core/Scheduler.java @@ -350,7 +350,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial * }); * * - * Slowing down the rate to no more than than 1 a second. This suffers from + * Slowing down the rate to no more than 1 a second. This suffers from * the same problem as the one above I could find an {@link Flowable} * operator that limits the rate without dropping the values (aka leaky * bucket algorithm). diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java index 36436d1370..11239d04a7 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java @@ -138,9 +138,12 @@ public void onSubscribe(Subscription s) { @Override public void onNext(T t) { if (!queue.offer(t)) { + // Error must be set first before calling cancel to avoid race + // with hasNext(), which checks for cancel first before checking + // for error. + error = new QueueOverflowException(); SubscriptionHelper.cancel(this); - - onError(new QueueOverflowException()); + onComplete(); } else { signalConsumer(); } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java index 469d0dd48b..a7de73213a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java index daa9edd533..99e11259a6 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java @@ -24,23 +24,7 @@ * * @param the source element type */ -public final class ObservableCache extends AbstractObservableWithUpstream -implements Observer { - - /** - * The subscription to the source should happen at most once. - */ - final AtomicBoolean once; - - /** - * The number of items per cached nodes. - */ - final int capacityHint; - - /** - * The current known array of observer state to notify. - */ - final AtomicReference[]> observers; +public final class ObservableCache extends AbstractObservableWithUpstream { /** * A shared instance of an empty array of observers to avoid creating @@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head; - - /** - * The current tail of the linked structure holding the items. - */ - Node tail; - - /** - * How many items have been put into the tail node so far. + * The subscription to the source should happen at most once. */ - int tailOffset; + final AtomicBoolean once; /** - * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + * Responsible caching events from the source and multicasting them to each downstream. */ - Throwable error; + final Multicaster multicaster; /** - * True if the source has terminated. + * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each + * points exclusively to the next node (if present). When a new downstream arrives, the subscription is + * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As + * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each + * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out + * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible + * for collection. */ - volatile boolean done; + final Node head; /** * Constructs an empty, non-connected cache. * @param source the source to subscribe to for the first incoming observer * @param capacityHint the number of items expected (reduce allocation frequency) */ - @SuppressWarnings("unchecked") public ObservableCache(Observable source, int capacityHint) { super(source); - this.capacityHint = capacityHint; this.once = new AtomicBoolean(); Node n = new Node<>(capacityHint); this.head = n; - this.tail = n; - this.observers = new AtomicReference<>(EMPTY); + this.multicaster = new Multicaster<>(capacityHint, n); } @Override protected void subscribeActual(Observer t) { - CacheDisposable consumer = new CacheDisposable<>(t, this); + CacheDisposable consumer = new CacheDisposable<>(t, multicaster, head); t.onSubscribe(consumer); - add(consumer); + multicaster.add(consumer); if (!once.get() && once.compareAndSet(false, true)) { - source.subscribe(this); + source.subscribe(multicaster); } else { - replay(consumer); + multicaster.replay(consumer); } } @@ -127,7 +99,7 @@ protected void subscribeActual(Observer t) { * @return true if the cache has observers */ /* public */ boolean hasObservers() { - return observers.get().length != 0; + return multicaster.get().length != 0; } /** @@ -135,194 +107,241 @@ protected void subscribeActual(Observer t) { * @return the number of currently cached event count */ /* public */ long cachedEventCount() { - return size; + return multicaster.size; } - /** - * Atomically adds the consumer to the {@link #observers} copy-on-write array - * if the source has not yet terminated. - * @param consumer the consumer to add - */ - void add(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - if (current == TERMINATED) { - return; - } - int n = current.length; + static final class Multicaster extends AtomicReference[]> implements Observer { - @SuppressWarnings("unchecked") - CacheDisposable[] next = new CacheDisposable[n + 1]; - System.arraycopy(current, 0, next, 0, n); - next[n] = consumer; + /** */ + private static final long serialVersionUID = 8514643269016498691L; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * The number of items per cached nodes. + */ + final int capacityHint; - /** - * Atomically removes the consumer from the {@link #observers} copy-on-write array. - * @param consumer the consumer to remove - */ - @SuppressWarnings("unchecked") - void remove(CacheDisposable consumer) { - for (;;) { - CacheDisposable[] current = observers.get(); - int n = current.length; - if (n == 0) { - return; - } + /** + * The total number of elements in the list available for reads. + */ + volatile long size; - int j = -1; - for (int i = 0; i < n; i++) { - if (current[i] == consumer) { - j = i; - break; - } - } + /** + * The current tail of the linked structure holding the items. + */ + Node tail; - if (j < 0) { - return; - } - CacheDisposable[] next; + /** + * How many items have been put into the tail node so far. + */ + int tailOffset; - if (n == 1) { - next = EMPTY; - } else { - next = new CacheDisposable[n - 1]; - System.arraycopy(current, 0, next, 0, j); - System.arraycopy(current, j + 1, next, j, n - j - 1); - } + /** + * If the observers are {@link #TERMINATED}, this holds the terminal error if not null. + */ + Throwable error; - if (observers.compareAndSet(current, next)) { - return; - } - } - } + /** + * True if the source has terminated. + */ + volatile boolean done; - /** - * Replays the contents of this cache to the given consumer based on its - * current state and number of items requested by it. - * @param consumer the consumer to continue replaying items to - */ - void replay(CacheDisposable consumer) { - // make sure there is only one replay going on at a time - if (consumer.getAndIncrement() != 0) { - return; + @SuppressWarnings("unchecked") + Multicaster(int capacityHint, final Node head) { + super(EMPTY); + this.tail = head; + this.capacityHint = capacityHint; } - // see if there were more replay request in the meantime - int missed = 1; - // read out state into locals upfront to avoid being re-read due to volatile reads - long index = consumer.index; - int offset = consumer.offset; - Node node = consumer.node; - Observer downstream = consumer.downstream; - int capacity = capacityHint; - - for (;;) { - // if the consumer got disposed, clear the node and quit - if (consumer.disposed) { - consumer.node = null; - return; + /** + * Atomically adds the consumer to the observers copy-on-write array + * if the source has not yet terminated. + * @param consumer the consumer to add + */ + void add(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = get(); + if (current == TERMINATED) { + return; + } + int n = current.length; + + @SuppressWarnings("unchecked") + CacheDisposable[] next = new CacheDisposable[n + 1]; + System.arraycopy(current, 0, next, 0, n); + next[n] = consumer; + + if (compareAndSet(current, next)) { + return; + } } + } - // first see if the source has terminated, read order matters! - boolean sourceDone = done; - // and if the number of items is the same as this consumer has received - boolean empty = size == index; - - // if the source is done and we have all items so far, terminate the consumer - if (sourceDone && empty) { - // release the node object to avoid leaks through retained consumers - consumer.node = null; - // if error is not null then the source failed - Throwable ex = error; - if (ex != null) { - downstream.onError(ex); + /** + * Atomically removes the consumer from the observers copy-on-write array. + * @param consumer the consumer to remove + */ + @SuppressWarnings("unchecked") + void remove(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = get(); + int n = current.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + CacheDisposable[] next; + + if (n == 1) { + next = EMPTY; } else { - downstream.onComplete(); + next = new CacheDisposable[n - 1]; + System.arraycopy(current, 0, next, 0, j); + System.arraycopy(current, j + 1, next, j, n - j - 1); } + + if (compareAndSet(current, next)) { + return; + } + } + } + + /** + * Replays the contents of this cache to the given consumer based on its + * current state and number of items requested by it. + * @param consumer the consumer to continue replaying items to + */ + void replay(CacheDisposable consumer) { + // make sure there is only one replay going on at a time + if (consumer.getAndIncrement() != 0) { return; } - // there are still items not sent to the consumer - if (!empty) { - // if the offset in the current node has reached the node capacity - if (offset == capacity) { - // switch to the subsequent node - node = node.next; - // reset the in-node offset - offset = 0; + // see if there were more replay request in the meantime + int missed = 1; + // read out state into locals upfront to avoid being re-read due to volatile reads + long index = consumer.index; + int offset = consumer.offset; + Node node = consumer.node; + Observer downstream = consumer.downstream; + int capacity = capacityHint; + + for (;;) { + // if the consumer got disposed, clear the node and quit + if (consumer.disposed) { + consumer.node = null; + return; } - // emit the cached item - downstream.onNext(node.values[offset]); - - // move the node offset forward - offset++; - // move the total consumed item count forward - index++; + // first see if the source has terminated, read order matters! + boolean sourceDone = done; + // and if the number of items is the same as this consumer has received + boolean empty = size == index; + + // if the source is done and we have all items so far, terminate the consumer + if (sourceDone && empty) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; + // if error is not null then the source failed + Throwable ex = error; + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); + } + return; + } - // retry for the next item/terminal event if any - continue; - } + // there are still items not sent to the consumer + if (!empty) { + // if the offset in the current node has reached the node capacity + if (offset == capacity) { + // switch to the subsequent node + node = node.next; + // reset the in-node offset + offset = 0; + } + + // emit the cached item + downstream.onNext(node.values[offset]); + + // move the node offset forward + offset++; + // move the total consumed item count forward + index++; + + // retry for the next item/terminal event if any + continue; + } - // commit the changed references back - consumer.index = index; - consumer.offset = offset; - consumer.node = node; - // release the changes and see if there were more replay request in the meantime - missed = consumer.addAndGet(-missed); - if (missed == 0) { - break; + // commit the changed references back + consumer.index = index; + consumer.offset = offset; + consumer.node = node; + // release the changes and see if there were more replay request in the meantime + missed = consumer.addAndGet(-missed); + if (missed == 0) { + break; + } } } - } - @Override - public void onSubscribe(Disposable d) { - // we can't do much with the upstream disposable - } - - @Override - public void onNext(T t) { - int tailOffset = this.tailOffset; - // if the current tail node is full, create a fresh node - if (tailOffset == capacityHint) { - Node n = new Node<>(tailOffset); - n.values[0] = t; - this.tailOffset = 1; - tail.next = n; - tail = n; - } else { - tail.values[tailOffset] = t; - this.tailOffset = tailOffset + 1; + @Override + public void onSubscribe(Disposable d) { + // we can't do much with the upstream disposable } - size++; - for (CacheDisposable consumer : observers.get()) { - replay(consumer); + + @Override + public void onNext(T t) { + int tailOffset = this.tailOffset; + // if the current tail node is full, create a fresh node + if (tailOffset == capacityHint) { + Node n = new Node<>(tailOffset); + n.values[0] = t; + this.tailOffset = 1; + tail.next = n; + tail = n; + } else { + tail.values[tailOffset] = t; + this.tailOffset = tailOffset + 1; + } + size++; + for (CacheDisposable consumer : get()) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onError(Throwable t) { - error = t; - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + error = t; + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : getAndSet(TERMINATED)) { + replay(consumer); + } } - } - @SuppressWarnings("unchecked") - @Override - public void onComplete() { - done = true; - for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { - replay(consumer); + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + // No additional events will arrive, so now we can clear the 'tail' reference + tail = null; + for (CacheDisposable consumer : getAndSet(TERMINATED)) { + replay(consumer); + } } } @@ -338,7 +357,7 @@ static final class CacheDisposable extends AtomicInteger final Observer downstream; - final ObservableCache parent; + final Multicaster parent; Node node; @@ -353,11 +372,12 @@ static final class CacheDisposable extends AtomicInteger * the parent cache object. * @param downstream the actual consumer * @param parent the parent that holds onto the cached items + * @param head the first node in the linked list */ - CacheDisposable(Observer downstream, ObservableCache parent) { + CacheDisposable(Observer downstream, Multicaster parent, Node head) { this.downstream = downstream; this.parent = parent; - this.node = parent.head; + this.node = head; } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java index 1801cce1f2..7c01c23f90 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java @@ -111,7 +111,9 @@ final class OnNext implements Runnable { @Override public void run() { - downstream.onNext(t); + if (!w.isDisposed()) { + downstream.onNext(t); + } } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java index d735ff43c0..e8d19c633e 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java +++ b/src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java @@ -91,6 +91,8 @@ public T poll() { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } else if (currConsumerNode != lvProducerNode()) { @@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) { // we have to null out the value because we are going to hang on to the node final T nextValue = nextNode.getAndNullValue(); spConsumerNode(nextNode); + // unlink previous consumer to help gc + currConsumerNode.soNext(null); return nextValue; } return null; diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java index 7c7b017988..fa0bcab7f8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ExecutorScheduler.java @@ -204,7 +204,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); - ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks); + ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks, interruptibleWorker); tasks.add(sr); if (executor instanceof ScheduledExecutorService) { diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java index 14c197bccc..2a1baa641a 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledRunnable.java @@ -24,6 +24,7 @@ public final class ScheduledRunnable extends AtomicReferenceArray private static final long serialVersionUID = -6120223772001106981L; final Runnable actual; + final boolean interruptOnCancel; /** Indicates that the parent tracking this task has been notified about its completion. */ static final Object PARENT_DISPOSED = new Object(); @@ -41,12 +42,26 @@ public final class ScheduledRunnable extends AtomicReferenceArray /** * Creates a ScheduledRunnable by wrapping the given action and setting * up the optional parent. + * The underlying future will be interrupted if the task is disposed asynchronously. * @param actual the runnable to wrap, not-null (not verified) * @param parent the parent tracking container or null if none */ public ScheduledRunnable(Runnable actual, DisposableContainer parent) { + this(actual, parent, true); + } + + /** + * Creates a ScheduledRunnable by wrapping the given action and setting + * up the optional parent. + * @param actual the runnable to wrap, not-null (not verified) + * @param parent the parent tracking container or null if none + * @param interruptOnCancel if true, the underlying future will be interrupted when disposing + * this task from a different thread than it is running on. + */ + public ScheduledRunnable(Runnable actual, DisposableContainer parent, boolean interruptOnCancel) { super(3); this.actual = actual; + this.interruptOnCancel = interruptOnCancel; this.lazySet(0, parent); } @@ -95,7 +110,7 @@ public void setFuture(Future f) { return; } if (o == ASYNC_DISPOSED) { - f.cancel(true); + f.cancel(interruptOnCancel); return; } if (compareAndSet(FUTURE_INDEX, o, f)) { @@ -114,7 +129,7 @@ public void dispose() { boolean async = get(THREAD_INDEX) != Thread.currentThread(); if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) { if (o != null) { - ((Future)o).cancel(async); + ((Future)o).cancel(async && interruptOnCancel); } break; } diff --git a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java index 0780aea610..814c971f1c 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerWhen.java @@ -80,7 +80,7 @@ * } * * - * Slowing down the rate to no more than than 1 a second. This suffers from the + * Slowing down the rate to no more than 1 a second. This suffers from the * same problem as the one above I could find an {@link Observable} operator * that limits the rate without dropping the values (aka leaky bucket * algorithm). diff --git a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java index 4d5eb3335f..e103594ba5 100644 --- a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java @@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer final List buffer; - volatile boolean done; - volatile int size; UnboundedReplayBuffer(int capacityHint) { @@ -671,7 +669,6 @@ public void addFinal(Object notificationLite) { buffer.add(notificationLite); trimHead(); size++; - done = true; } @Override @@ -772,20 +769,17 @@ public void replay(ReplayDisposable rs) { Object o = b.get(index); - if (done) { - if (index + 1 == s) { - s = size; - if (index + 1 == s) { - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); @@ -856,8 +850,6 @@ static final class SizeBoundReplayBuffer Node tail; - volatile boolean done; - SizeBoundReplayBuffer(int maxSize) { this.maxSize = maxSize; Node h = new Node<>(null); @@ -895,7 +887,6 @@ public void addFinal(Object notificationLite) { t.lazySet(n); // releases both the tail and size trimHead(); - done = true; } /** @@ -1000,18 +991,17 @@ public void replay(ReplayDisposable rs) { Object o = n.value; - if (done) { - if (n.get() == null) { - - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); @@ -1069,8 +1059,6 @@ static final class SizeAndTimeBoundReplayBuffer TimedNode tail; - volatile boolean done; - SizeAndTimeBoundReplayBuffer(int maxSize, long maxAge, TimeUnit unit, Scheduler scheduler) { this.maxSize = maxSize; this.maxAge = maxAge; @@ -1163,8 +1151,6 @@ public void addFinal(Object notificationLite) { size++; t.lazySet(n); // releases both the tail and size trimFinal(); - - done = true; } /** @@ -1290,18 +1276,17 @@ public void replay(ReplayDisposable rs) { Object o = n.value; - if (done) { - if (n.get() == null) { - - if (NotificationLite.isComplete(o)) { - a.onComplete(); - } else { - a.onError(NotificationLite.getError(o)); - } - rs.index = null; - rs.cancelled = true; - return; - } + if (NotificationLite.isComplete(o)) { + a.onComplete(); + rs.index = null; + rs.cancelled = true; + return; + } else + if (NotificationLite.isError(o)) { + a.onError(NotificationLite.getError(o)); + rs.index = null; + rs.cancelled = true; + return; } a.onNext((T)o); diff --git a/src/test/java/io/reactivex/rxjava3/disposables/CompositeDisposableTest.java b/src/test/java/io/reactivex/rxjava3/disposables/CompositeDisposableTest.java index a8ee27b307..3f76d283d7 100644 --- a/src/test/java/io/reactivex/rxjava3/disposables/CompositeDisposableTest.java +++ b/src/test/java/io/reactivex/rxjava3/disposables/CompositeDisposableTest.java @@ -290,7 +290,7 @@ public void tryRemoveIfNotIn() { cd.remove(cd1); cd.add(cd2); - cd.remove(cd1); // try removing agian + cd.remove(cd1); // try removing again } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/rxjava3/exceptions/TestException.java b/src/test/java/io/reactivex/rxjava3/exceptions/TestException.java index efd568b120..7bcdd318fd 100644 --- a/src/test/java/io/reactivex/rxjava3/exceptions/TestException.java +++ b/src/test/java/io/reactivex/rxjava3/exceptions/TestException.java @@ -14,7 +14,7 @@ package io.reactivex.rxjava3.exceptions; /** - * Exception for testing if unchecked expections propagate as-is without confusing with + * Exception for testing if unchecked exceptions propagate as-is without confusing with * other type of common exceptions. */ public final class TestException extends RuntimeException { diff --git a/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java b/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java index 975ef5c149..e6b3a02c68 100644 --- a/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java +++ b/src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java @@ -132,7 +132,7 @@ public void mergeSync() { assertEquals(num, ts.values().size()); // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) // TODO is it possible to make this deterministic rather than one possibly starving the other? - // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algorithms generally take a performance hit assertTrue(c1.get() < Flowable.bufferSize() * 5); assertTrue(c2.get() < Flowable.bufferSize() * 5); } @@ -154,7 +154,7 @@ public void mergeAsync() { assertEquals(num, ts.values().size()); // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) // TODO is it possible to make this deterministic rather than one possibly starving the other? - // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algorithms generally take a performance hit int max = Flowable.bufferSize() * 7; assertTrue("" + c1.get() + " >= " + max, c1.get() < max); assertTrue("" + c2.get() + " >= " + max, c2.get() < max); @@ -206,7 +206,7 @@ public void mergeAsyncThenObserveOn() { assertEquals(num, ts.values().size()); // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) // TODO is it possible to make this deterministic rather than one possibly starving the other? - // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algorithms generally take a performance hit // akarnokd => run this in a loop over 10k times and never saw values get as high as 7*SIZE, but since observeOn delays the unsubscription non-deterministically, the test will remain unreliable assertTrue(c1.get() < Flowable.bufferSize() * 7); assertTrue(c2.get() < Flowable.bufferSize() * 7); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java index ba86c69719..1a1ceca926 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java @@ -402,7 +402,8 @@ public void disposed() { @Test public void manySources() { - Flowable[] a = new Flowable[32]; + @SuppressWarnings("unchecked") + Flowable[] a = new Flowable[32]; Arrays.fill(a, Flowable.never()); a[31] = Flowable.just(1); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java index 3160f61173..4d67594a17 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -28,6 +29,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.processors.PublishProcessor; import io.reactivex.rxjava3.schedulers.*; @@ -1030,4 +1032,38 @@ public Publisher apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher"); } + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Flowable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java index 47bbd491bd..8d3e10dbd2 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java @@ -1870,7 +1870,7 @@ public Integer apply(Integer a, Integer b) throws Exception { public void firstErrorPreventsSecondSubscription() { final AtomicInteger counter = new AtomicInteger(); - List> flowableList = new ArrayList<>(); + List> flowableList = new ArrayList<>(); flowableList.add(Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter e) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java index 0a349cd417..5ec9129d9f 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java @@ -235,7 +235,8 @@ public void ambArraySingleElement() { @Test public void manySources() { - Observable[] a = new Observable[32]; + @SuppressWarnings("unchecked") + Observable[] a = new Observable[32]; Arrays.fill(a, Observable.never()); a[31] = Observable.just(1); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java index 8264a66494..7085720f01 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java @@ -1278,9 +1278,9 @@ public Integer apply(Integer integer, Long aLong) { } }) .buffer(Observable.interval(0, 200, TimeUnit.MILLISECONDS), - new Function>() { + new Function>() { @Override - public Observable apply(Long a) { + public Observable apply(Long a) { return Observable.just(a).delay(100, TimeUnit.MILLISECONDS); } }) @@ -1301,9 +1301,9 @@ public Integer apply(Integer integer, Long aLong) { } }) .buffer(Observable.interval(0, 100, TimeUnit.MILLISECONDS), - new Function>() { + new Function>() { @Override - public Observable apply(Long a) { + public Observable apply(Long a) { return Observable.just(a).delay(200, TimeUnit.MILLISECONDS); } }) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java index 7f47ec95d8..74d17c062b 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCacheTest.java @@ -16,10 +16,15 @@ import static org.junit.Assert.*; import static org.mockito.Mockito.*; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.lang.management.MemoryUsage; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import io.reactivex.rxjava3.observables.ConnectableObservable; import org.junit.Test; import io.reactivex.rxjava3.core.*; @@ -355,4 +360,52 @@ public void addRemoveRace() { ); } } + + @Test + public void valuesAreReclaimable() throws Exception { + ConnectableObservable source = + Observable.range(0, 200) + .map($ -> new byte[1024 * 1024]) + .publish(); + + System.out.println("Bounded Replay Leak check: Wait before GC"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC"); + System.gc(); + + Thread.sleep(500); + + final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage(); + long initial = memHeap.getUsed(); + + System.out.printf("Bounded Replay Leak check: Starting: %.3f MB%n", initial / 1024.0 / 1024.0); + + final AtomicLong after = new AtomicLong(); + + source.cache().lastElement().subscribe(new Consumer() { + @Override + public void accept(byte[] v) throws Exception { + System.out.println("Bounded Replay Leak check: Wait before GC 2"); + Thread.sleep(1000); + + System.out.println("Bounded Replay Leak check: GC 2"); + System.gc(); + + Thread.sleep(500); + + after.set(memoryMXBean.getHeapMemoryUsage().getUsed()); + } + }); + + source.connect(); + + System.out.printf("Bounded Replay Leak check: After: %.3f MB%n", after.get() / 1024.0 / 1024.0); + + if (initial + 100 * 1024 * 1024 < after.get()) { + fail("Bounded Replay Leak check: Memory leak detected: " + (initial / 1024.0 / 1024.0) + + " -> " + after.get() / 1024.0 / 1024.0); + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java index ea8d51d996..778d2d4e64 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java @@ -20,6 +20,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.junit.*; import org.mockito.InOrder; @@ -29,6 +30,7 @@ import io.reactivex.rxjava3.core.Observer; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.functions.*; +import io.reactivex.rxjava3.internal.disposables.SequentialDisposable; import io.reactivex.rxjava3.internal.functions.Functions; import io.reactivex.rxjava3.observers.*; import io.reactivex.rxjava3.schedulers.*; @@ -978,4 +980,37 @@ public Observable apply(Integer t) throws Exception { .to(TestHelper.testConsumer()) .assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null ObservableSource"); } -} + + @Test + public void cancelShouldPreventRandomSubsequentEmissions() { + for (int attempt = 1; attempt < 100; attempt ++) { + + SequentialDisposable disposable = new SequentialDisposable(); + ConcurrentLinkedQueue sink = new ConcurrentLinkedQueue<>(); + + disposable.replace( + Observable.range(1, 10) + .delay(1, TimeUnit.MICROSECONDS, Schedulers.computation(), true) + .doOnNext(v -> { + if (v == 1) { + Schedulers.computation().scheduleDirect(disposable::dispose); + } + sink.offer(v); + }) + .subscribe()); + + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1)); + + Integer last = null; + + while (!sink.isEmpty()) { + Integer current = sink.poll(); + + if (last != null && last + 1 != current) { + fail("Emission hole: " + last + " -> " + current); + } + + last = current; + } + } + }} diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java index ae7de66d6f..faebee3fa5 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java @@ -1403,7 +1403,7 @@ public Integer apply(Integer t1, Integer t2) throws Exception { public void firstErrorPreventsSecondSubscription() { final AtomicInteger counter = new AtomicInteger(); - List> observableList = new ArrayList<>(); + List> observableList = new ArrayList<>(); observableList.add(Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java index 083ee46238..8086aa7c1c 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java @@ -249,7 +249,8 @@ public void run() { @Test public void manySources() { - Single[] sources = new Single[32]; + @SuppressWarnings("unchecked") + Single[] sources = new Single[32]; Arrays.fill(sources, Single.never()); sources[31] = Single.just(31); diff --git a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java index aa7026a9ff..35a877911b 100644 --- a/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java @@ -1832,4 +1832,69 @@ public void timeAndSizeRemoveCorrectNumberOfOld() { rp.test().assertValuesOnly(4, 5); } -} + + @Test + public void terminationSubscriptionRaceUnbounded() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.create(); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceSizeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithSize(20); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceTimeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + ReplayProcessor source = ReplayProcessor.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation()); + PublishProcessor sink = PublishProcessor.create(); + TestSubscriber subscriber = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(new BooleanSubscription()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + subscriber.await().assertValues("hello", "world").assertComplete(); + } + }} diff --git a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java index f4b5f82ec4..074ac8039a 100644 --- a/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java +++ b/src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java @@ -965,4 +965,149 @@ public void run() { exec.shutdown(); } } + + public static class TrackInterruptScheduledExecutor extends ScheduledThreadPoolExecutor { + + public final AtomicBoolean interruptReceived = new AtomicBoolean(); + + public TrackInterruptScheduledExecutor() { + super(10); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return new TrackingScheduledFuture(super.schedule(callable, delay, unit)); + } + + class TrackingScheduledFuture implements ScheduledFuture { + + ScheduledFuture original; + + TrackingScheduledFuture(ScheduledFuture original) { + this.original = original; + } + + @Override + public long getDelay(TimeUnit unit) { + return original.getDelay(unit); + } + + @Override + public int compareTo(Delayed o) { + return original.compareTo(o); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (mayInterruptIfRunning) { + interruptReceived.set(true); + } + return original.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return original.isCancelled(); + } + + @Override + public boolean isDone() { + return original.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return original.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return get(timeout, unit); + } + } + } + + @Test + public void noInterruptBeforeRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, false); + + Worker worker = sch.createWorker(); + + Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS); + + d.dispose(); + + int i = 150; + + while (i-- > 0) { + assertFalse("Task interrupt detected", exec.interruptReceived.get()); + Thread.sleep(10); + } + + } finally { + exec.shutdownNow(); + } + } + + @Test + public void hasInterruptBeforeRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, true); + + Worker worker = sch.createWorker(); + + Disposable d = worker.schedule(() -> { }, 1, TimeUnit.SECONDS); + + d.dispose(); + + Thread.sleep(100); + assertTrue("Task interrupt detected", exec.interruptReceived.get()); + + } finally { + exec.shutdownNow(); + } + } + + @Test + public void noInterruptAfterRunningDelayedWorker() throws Throwable { + TrackInterruptScheduledExecutor exec = new TrackInterruptScheduledExecutor(); + + try { + Scheduler sch = Schedulers.from(exec, false); + + Worker worker = sch.createWorker(); + AtomicBoolean taskRun = new AtomicBoolean(); + + Disposable d = worker.schedule(() -> { + taskRun.set(true); + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + exec.interruptReceived.set(true); + } + }, 100, TimeUnit.MILLISECONDS); + + Thread.sleep(150); + ; + d.dispose(); + + int i = 50; + + while (i-- > 0) { + assertFalse("Task interrupt detected", exec.interruptReceived.get()); + Thread.sleep(10); + } + + assertTrue("Task run at all", taskRun.get()); + + } finally { + exec.shutdownNow(); + } + } } diff --git a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java index e752278b2f..8417b53081 100644 --- a/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java @@ -1378,4 +1378,70 @@ public void timeAndSizeRemoveCorrectNumberOfOld() { rs.test().assertValuesOnly(4, 5); } + + @Test + public void terminationSubscriptionRaceUnbounded() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.create(); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceSizeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithSize(20); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } + + @Test + public void terminationSubscriptionRaceTimeBound() throws Throwable { + for (int i = 1; i <= 10000; i++) { + Subject source = ReplaySubject.createWithTime(20, TimeUnit.MINUTES, Schedulers.computation()); + Subject sink = PublishSubject.create(); + TestObserver observer = sink.test(); + Schedulers.computation().scheduleDirect(() -> { + // issue signals to the source in adherence to the reactive streams specification + source.onSubscribe(Disposable.empty()); + source.onNext("hello"); + source.onNext("world"); + source.onComplete(); + }); + Schedulers.computation().scheduleDirect(() -> { + // connect the source to the sink in parallel with the signals issued to the source + // note the cast() operator, which is here to detect non-String escapees + source.cast(String.class).subscribe(sink); + }); + observer.await().assertValues("hello", "world").assertComplete(); + } + } }