Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Sending notifications as Flow more reliably
  • Loading branch information
philips77 committed Feb 4, 2025
commit 0ee79e6be449824f14f9be9b8e1007be40cc5f82
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

package no.nordicsemi.android.ble.ktx

import android.util.Log
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.onFailure
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.callbackFlow
import no.nordicsemi.android.ble.ValueChangedCallback
import no.nordicsemi.android.ble.callback.profile.ProfileReadResponse
Expand All @@ -17,6 +21,11 @@ import no.nordicsemi.android.ble.response.ReadResponse
* Usage:
*
* val hrmMeasurementsData = setNotificationCallback(hrmCharacteristic).asFlow() // Flow<Data>
*
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
*
* val hrmMeasurementsData = setNotificationCallback(hrmCharacteristic).asFlow().buffer()
* @return The flow.
* @since 2.3.0
*/
Expand All @@ -25,7 +34,8 @@ fun ValueChangedCallback.asFlow(): Flow<Data> = callbackFlow {
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
with { _, data ->
trySend(data)
trySendBlocking(data)
.onFailure { t -> Log.w("ValueChangeCallback", "Sending data to Flow failed with: $t") }
}
awaitClose {
// There's no way to unregister the callback from here.
Expand All @@ -41,6 +51,9 @@ fun ValueChangedCallback.asFlow(): Flow<Data> = callbackFlow {
* val hrmMeasurementsData: Flow<HeartRateMeasurementResponse> =
* setNotificationCallback(hrmCharacteristic)
* .asResponseFlow()
*
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
* @return The flow.
* @since 2.4.0
*/
Expand All @@ -49,7 +62,10 @@ inline fun <reified T: ReadResponse> ValueChangedCallback.asResponseFlow(): Flow
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
with { device, data ->
trySend(T::class.java.getDeclaredConstructor().newInstance().apply { onDataReceived(device, data) })
val response = T::class.java.getDeclaredConstructor().newInstance()
.apply { onDataReceived(device, data) }
trySendBlocking(response)
.onFailure { t -> Log.w("ValueChangeCallback", "Sending response to Flow failed with: $t") }
}
awaitClose {
// There's no way to unregister the callback from here.
Expand All @@ -66,6 +82,9 @@ inline fun <reified T: ReadResponse> ValueChangedCallback.asResponseFlow(): Flow
* val hrmMeasurementsData: Flow<HeartRateMeasurementResponse> =
* setNotificationCallback(hrmCharacteristic)
* .asValidResponseFlow()
*
* Use the [buffer] operator on the resulting flow to specify a user-defined value and to control
* what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.
* @return The flow.
* @since 2.4.0
*/
Expand All @@ -74,10 +93,12 @@ inline fun <reified T: ProfileReadResponse> ValueChangedCallback.asValidResponse
// Make sure the callbacks are called without unnecessary delay.
setHandler(null)
with { device, data ->
T::class.java.getDeclaredConstructor().newInstance()
val response = T::class.java.getDeclaredConstructor().newInstance()
.apply { onDataReceived(device, data) }
.takeIf { it.isValid }
?.let { trySend(it) }
if (response.isValid) {
trySendBlocking(response)
.onFailure { t -> Log.w("ValueChangeCallback", "Sending response to Flow failed with: $t") }
}
}
awaitClose {
// There's no way to unregister the callback from here.
Expand Down