Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
OSConsistencyManager & related classes
Motivation: Manages (kafka) offsets that function as read-your-write tokens for more accurate segment membership calculation. The manager works based on conditions & offsets.

Offsets are stored in a nested map indexed by a unique id (e.g. `onesignalId`) and offset key (e.g. `USER_UPDATE`).

This allows us to track offsets on a per-user basis (e.g. handle switching users).

Conditions work by creating a blocking mechanism with customizable offset retrieval until a pre-defined condition is met (e.g. at least two specific offsets are available).

OSCondition interface: allows extensibility for future applications to control offset blocking mechanism in consistency use-cases.
  • Loading branch information
Rodrigo Gomez Palacio committed Oct 23, 2024
commit eb459463cac9c368903cf030d9e3bba1050a617c
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//
// OSCondition.swift
// OneSignalOSCore
//
// Created by Rodrigo Gomez-Palacio on 9/10/24.
// Copyright © 2024 OneSignal. All rights reserved.
//

import Foundation

@objc public protocol OSCondition: AnyObject {
// Each conforming class will provide its unique ID
var conditionId: String { get }
func isMet(indexedTokens: [String: [NSNumber: OSReadYourWriteData]]) -> Bool
func getNewestToken(indexedTokens: [String: [NSNumber: OSReadYourWriteData]]) -> OSReadYourWriteData?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//
// OSConsistencyKeyEnum.swift
// OneSignalOSCore
//
// Created by Rodrigo Gomez-Palacio on 9/10/24.
// Copyright © 2024 OneSignal. All rights reserved.
//

import Foundation

// Protocol for enums with Int raw values.
public protocol OSConsistencyKeyEnum: RawRepresentable where RawValue == Int { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//
// OSConsistencyManager.swift
// OneSignalOSCore
//
// Created by Rodrigo Gomez-Palacio on 9/10/24.
// Copyright © 2024 OneSignal. All rights reserved.
//

import Foundation

@objc public class OSConsistencyManager: NSObject {
// Singleton instance
@objc public static let shared = OSConsistencyManager()

private let queue = DispatchQueue(label: "com.consistencyManager.queue")
private var indexedTokens: [String: [NSNumber: OSReadYourWriteData]] = [:]
private var indexedConditions: [String: [(OSCondition, DispatchSemaphore)]] = [:] // Index conditions by condition id

// Private initializer to prevent multiple instances
private override init() {}

// Used for testing
public func reset() {
indexedTokens = [:]
indexedConditions = [:]
}

// Function to set the token in a thread-safe manner
public func setRywTokenAndDelay(id: String, key: any OSConsistencyKeyEnum, value: OSReadYourWriteData) {
queue.sync {
let nsKey = NSNumber(value: key.rawValue)
if self.indexedTokens[id] == nil {
self.indexedTokens[id] = [:]
}
self.indexedTokens[id]?[nsKey] = value
self.checkConditionsAndComplete(forId: id) // Only check conditions for this specific ID
}
}

// Register a condition and block the caller until the condition is met
@objc public func getRywTokenFromAwaitableCondition(_ condition: OSCondition, forId id: String) -> OSReadYourWriteData? {
let semaphore = DispatchSemaphore(value: 0)
queue.sync {
if self.conditions[id] == nil {
self.conditions[id] = []
}
self.conditions[id]?.append((condition, semaphore))
self.checkConditionsAndComplete(forId: id)
}
semaphore.wait() // Block until the condition is met
return queue.sync {
return condition.getNewestToken(indexedTokens: self.indexedTokens)
}
}

// Method to resolve conditions by condition ID (e.g. OSIamFetchReadyCondition.ID)
@objc public func resolveConditionsWithID(id: String) {
guard let conditionList = conditions[id] else { return }
var completedConditions: [(OSCondition, DispatchSemaphore)] = []
for (condition, semaphore) in conditionList {
if (condition.conditionId == id) {
semaphore.signal()
completedConditions.append((condition, semaphore))
}
}
conditions[id]?.removeAll { condition, semaphore in
completedConditions.contains(where: { $0.0 === condition && $0.1 == semaphore })
}
}

// Private method to check conditions for a specific id (unique ID like onesignalId)
private func checkConditionsAndComplete(forId id: String) {
guard let conditionList = conditions[id] else { return }
var completedConditions: [(OSCondition, DispatchSemaphore)] = []
for (condition, semaphore) in conditionList {
if condition.isMet(indexedTokens: indexedTokens) {
print("Condition met for id: \(id)")
semaphore.signal()
completedConditions.append((condition, semaphore))
} else {
print("Condition not met for id: \(id)")
}
}
conditions[id]?.removeAll { condition, semaphore in
completedConditions.contains(where: { $0.0 === condition && $0.1 == semaphore })
}
}
}
Loading