Skip to content

Commit 7ed1407

Browse files
committed
#103 -- Provide error in unsubscribe pub/sub handler if available
1 parent 9da5773 commit 7ed1407

File tree

6 files changed

+121
-50
lines changed

6 files changed

+121
-50
lines changed

Sources/RediStack/ChannelHandlers/RedisPubSubHandler.swift

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2020 RediStack project authors
5+
// Copyright (c) 2020-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -24,18 +24,35 @@ import NIO
2424
/// - message: The message data that was received from the `publisher`.
2525
public typealias RedisSubscriptionMessageReceiver = (_ publisher: RedisChannelName, _ message: RESPValue) -> Void
2626

27-
/// A closure handler invoked for Pub/Sub subscription changes.
27+
/// The details of the subscription change.
28+
/// - Parameters:
29+
/// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
30+
/// - currentSubscriptionCount: The current total number of subscriptions the connection has.
31+
public typealias RedisSubscriptionChangeDetails = (subscriptionKey: String, currentSubscriptionCount: Int)
32+
33+
/// A closure handler invoked for Pub/Sub subscribe commands.
2834
///
2935
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
30-
/// even if it was done as a single PSUBSCRIBE, SUBSCRIBE, PUNSUBSCRIBE, or UNSUBSCRIBE command.
36+
/// even if it was done as a single PSUBSCRIBE or SUBSCRIBE command.
37+
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
38+
///
39+
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
40+
/// so as to not block further messages from being processed.
41+
/// - Parameter details: The details of the subscription.
42+
public typealias RedisSubscribeHandler = (_ details: RedisSubscriptionChangeDetails) -> Void
43+
44+
/// A closure handler invoked for Pub/Sub unsubscribe commands.
45+
///
46+
/// This closure will be invoked only *once* for each individual channel or pattern that is having its subscription changed,
47+
/// even if it was done as a single PUNSUBSCRIBE or UNSUBSCRIBE command.
3148
/// - Warning: The receiver is called on the same `NIO.EventLoop` that processed the message.
3249
///
3350
/// If you are doing non-trivial work in response to PubSub messages, it is **highly recommended** that the work be dispatched to another thread
3451
/// so as to not block further messages from being processed.
3552
/// - Parameters:
36-
/// - subscriptionKey: The subscribed channel or pattern that had its subscription status changed.
37-
/// - currentSubscriptionCount: The current total number of subscriptions the connection has.
38-
public typealias RedisSubscriptionChangeHandler = (_ subscriptionKey: String, _ currentSubscriptionCount: Int) -> Void
53+
/// - details: The details of the subscription.
54+
/// - error: The error triggering the unsubscribe, if any. If this is `nil`, then the subscription was expected as a response to a user-initiated unsubscribe.
55+
public typealias RedisUnsubscribeHandler = (_ details: RedisSubscriptionChangeDetails, _ error: Error?) -> Void
3956

4057
/// A list of patterns or channels that a Pub/Sub subscription change is targetting.
4158
///
@@ -146,7 +163,7 @@ extension RedisPubSubHandler {
146163

147164
guard let subscription = self.subscriptions[prefixedKey] else { return }
148165

149-
subscription.onSubscribe?(subscriptionKey, subscriptionCount)
166+
subscription.onSubscribe?((subscriptionKey, subscriptionCount))
150167
subscription.onSubscribe = nil // nil to free memory
151168
self.subscriptions[prefixedKey] = subscription
152169

@@ -162,7 +179,7 @@ extension RedisPubSubHandler {
162179
let prefixedKey = self.prefixKey(subscriptionKey, with: keyPrefix)
163180
guard let subscription = self.subscriptions.removeValue(forKey: prefixedKey) else { return }
164181

165-
subscription.onUnsubscribe?(subscriptionKey, subscriptionCount)
182+
subscription.onUnsubscribe?((subscriptionKey, subscriptionCount), nil)
166183
subscription.type.gauge.decrement()
167184

168185
switch self.pendingUnsubscribes.removeValue(forKey: prefixedKey) {
@@ -208,8 +225,8 @@ extension RedisPubSubHandler {
208225
public func addSubscription(
209226
for target: RedisSubscriptionTarget,
210227
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
211-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
212-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
228+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
229+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
213230
) -> EventLoopFuture<Int> {
214231
guard self.eventLoop.inEventLoop else {
215232
return self.eventLoop.flatSubmit {
@@ -481,7 +498,7 @@ extension RedisPubSubHandler: ChannelInboundHandler {
481498
let receivers = self.subscriptions
482499
self.subscriptions.removeAll()
483500
receivers.forEach {
484-
$0.value.onUnsubscribe?($0.key, 0)
501+
$0.value.onUnsubscribe?(($0.key, 0), error)
485502
$0.value.type.gauge.decrement()
486503
}
487504
}
@@ -521,14 +538,14 @@ extension RedisPubSubHandler {
521538
fileprivate final class Subscription {
522539
let type: SubscriptionType
523540
let onMessage: RedisSubscriptionMessageReceiver
524-
var onSubscribe: RedisSubscriptionChangeHandler? // will be set to nil after first call
525-
let onUnsubscribe: RedisSubscriptionChangeHandler?
541+
var onSubscribe: RedisSubscribeHandler? // will be set to nil after first call
542+
let onUnsubscribe: RedisUnsubscribeHandler?
526543

527544
init(
528545
type: SubscriptionType,
529546
messageReceiver: @escaping RedisSubscriptionMessageReceiver,
530-
subscribeHandler: RedisSubscriptionChangeHandler?,
531-
unsubscribeHandler: RedisSubscriptionChangeHandler?
547+
subscribeHandler: RedisSubscribeHandler?,
548+
unsubscribeHandler: RedisUnsubscribeHandler?
532549
) {
533550
self.type = type
534551
self.onMessage = messageReceiver

Sources/RediStack/RedisClient.swift

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public protocol RedisClient {
7171
eventLoop: EventLoop?,
7272
logger: Logger?,
7373
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
74-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
75-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
74+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
75+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
7676
) -> EventLoopFuture<Void>
7777

7878
/// Subscribes the client to the specified Redis channel name patterns, invoking the provided message receiver each time a message is published to
@@ -100,8 +100,8 @@ public protocol RedisClient {
100100
eventLoop: EventLoop?,
101101
logger: Logger?,
102102
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
103-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
104-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
103+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
104+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
105105
) -> EventLoopFuture<Void>
106106

107107
/// Unsubscribes the client from a specific Redis channel from receiving any future published messages.
@@ -194,8 +194,8 @@ extension RedisClient {
194194
eventLoop: EventLoop? = nil,
195195
logger: Logger? = nil,
196196
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
197-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
198-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
197+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
198+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
199199
) -> EventLoopFuture<Void> {
200200
return self.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
201201
}
@@ -205,8 +205,8 @@ extension RedisClient {
205205
eventLoop: EventLoop? = nil,
206206
logger: Logger? = nil,
207207
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
208-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
209-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
208+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
209+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
210210
) -> EventLoopFuture<Void> {
211211
return self.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
212212
}
@@ -216,8 +216,8 @@ extension RedisClient {
216216
eventLoop: EventLoop? = nil,
217217
logger: Logger? = nil,
218218
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
219-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
220-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
219+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
220+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
221221
) -> EventLoopFuture<Void> {
222222
return self.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
223223
}
@@ -227,8 +227,8 @@ extension RedisClient {
227227
eventLoop: EventLoop? = nil,
228228
logger: Logger? = nil,
229229
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
230-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
231-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
230+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
231+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
232232
) -> EventLoopFuture<Void> {
233233
return self.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
234234
}

Sources/RediStack/RedisConnection.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,8 @@ extension RedisConnection {
381381
eventLoop: EventLoop? = nil,
382382
logger: Logger? = nil,
383383
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
384-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
385-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
384+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
385+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
386386
) -> EventLoopFuture<Void> {
387387
return self._subscribe(.channels(channels), receiver, subscribeHandler, unsubscribeHandler, eventLoop, logger)
388388
}
@@ -392,17 +392,17 @@ extension RedisConnection {
392392
eventLoop: EventLoop? = nil,
393393
logger: Logger? = nil,
394394
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
395-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler? = nil,
396-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler? = nil
395+
onSubscribe subscribeHandler: RedisSubscribeHandler? = nil,
396+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler? = nil
397397
) -> EventLoopFuture<Void> {
398398
return self._subscribe(.patterns(patterns), receiver, subscribeHandler, unsubscribeHandler, eventLoop, logger)
399399
}
400400

401401
private func _subscribe(
402402
_ target: RedisSubscriptionTarget,
403403
_ receiver: @escaping RedisSubscriptionMessageReceiver,
404-
_ onSubscribe: RedisSubscriptionChangeHandler?,
405-
_ onUnsubscribe: RedisSubscriptionChangeHandler?,
404+
_ onSubscribe: RedisSubscribeHandler?,
405+
_ onUnsubscribe: RedisUnsubscribeHandler?,
406406
_ eventLoop: EventLoop?,
407407
_ logger: Logger?
408408
) -> EventLoopFuture<Void> {

Sources/RediStack/RedisConnectionPool.swift

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -363,8 +363,8 @@ extension RedisConnectionPool: RedisClient {
363363
eventLoop: EventLoop? = nil,
364364
logger: Logger? = nil,
365365
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
366-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
367-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
366+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
367+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
368368
) -> EventLoopFuture<Void> {
369369
return self._subscribe(
370370
using: {
@@ -388,8 +388,8 @@ extension RedisConnectionPool: RedisClient {
388388
eventLoop: EventLoop? = nil,
389389
logger: Logger? = nil,
390390
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
391-
onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?,
392-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?
391+
onSubscribe subscribeHandler: RedisSubscribeHandler?,
392+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?
393393
) -> EventLoopFuture<Void> {
394394
return self._subscribe(
395395
using: {
@@ -433,8 +433,8 @@ extension RedisConnectionPool: RedisClient {
433433
}
434434

435435
private func _subscribe(
436-
using operation: @escaping (RedisConnection, @escaping RedisSubscriptionChangeHandler, Logger) -> EventLoopFuture<Void>,
437-
onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?,
436+
using operation: @escaping (RedisConnection, @escaping RedisUnsubscribeHandler, Logger) -> EventLoopFuture<Void>,
437+
onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?,
438438
eventLoop: EventLoop?,
439439
taskLogger: Logger?
440440
) -> EventLoopFuture<Void> {
@@ -446,11 +446,11 @@ extension RedisConnectionPool: RedisClient {
446446
self.pubsubConnection = connection
447447
}
448448

449-
let onUnsubscribe: RedisSubscriptionChangeHandler = { channelName, subCount in
450-
defer { unsubscribeHandler?(channelName, subCount) }
449+
let onUnsubscribe: RedisUnsubscribeHandler = { subscriptionDetails, error in
450+
defer { unsubscribeHandler?(subscriptionDetails, error) }
451451

452452
guard
453-
subCount == 0,
453+
subscriptionDetails.currentSubscriptionCount == 0,
454454
let connection = self.pubsubConnection
455455
else { return }
456456

Sources/RediStack/RedisLogging.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ internal struct CustomLoggerRedisClient<Client: RedisClient>: RedisClient {
109109
return self.client.punsubscribe(from: patterns, eventLoop: eventLoop, logger: logger)
110110
}
111111

112-
internal func subscribe(to channels: [RedisChannelName], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?, onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?) -> EventLoopFuture<Void> {
112+
internal func subscribe(to channels: [RedisChannelName], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscribeHandler?, onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?) -> EventLoopFuture<Void> {
113113
let logger = logger ?? self.defaultLogger
114114
return self.client.subscribe(to: channels, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
115115
}
116116

117-
internal func psubscribe(to patterns: [String], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscriptionChangeHandler?, onUnsubscribe unsubscribeHandler: RedisSubscriptionChangeHandler?) -> EventLoopFuture<Void> {
117+
internal func psubscribe(to patterns: [String], eventLoop: EventLoop?, logger: Logger?, messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver, onSubscribe subscribeHandler: RedisSubscribeHandler?, onUnsubscribe unsubscribeHandler: RedisUnsubscribeHandler?) -> EventLoopFuture<Void> {
118118
let logger = logger ?? self.defaultLogger
119119
return self.client.psubscribe(to: patterns, eventLoop: eventLoop, logger: logger, messageReceiver: receiver, onSubscribe: subscribeHandler, onUnsubscribe: unsubscribeHandler)
120120
}

Tests/RediStackIntegrationTests/Commands/PubSubCommandsTests.swift

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the RediStack open source project
44
//
5-
// Copyright (c) 2020 RediStack project authors
5+
// Copyright (c) 2020-2022 RediStack project authors
66
// Licensed under Apache License v2.0
77
//
88
// See LICENSE.txt for license information
@@ -12,7 +12,9 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
import RediStack
15+
import NIO
16+
import NIOEmbedded
17+
@testable import RediStack
1618
import RediStackTestUtils
1719
import XCTest
1820

@@ -40,8 +42,12 @@ final class RedisPubSubCommandsTests: RediStackIntegrationTestCase {
4042
guard $0 == #function, $1 == 1 else { return }
4143
subscribeExpectation.fulfill()
4244
},
43-
onUnsubscribe: {
44-
guard $0 == #function, $1 == 0 else { return }
45+
onUnsubscribe: { details, error in
46+
guard
47+
error == nil,
48+
details.subscriptionKey == #function,
49+
details.currentSubscriptionCount == 0
50+
else { return }
4551
unsubscribeExpectation.fulfill()
4652
}
4753
).wait()
@@ -294,8 +300,12 @@ final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTest
294300
guard $0 == #function, $1 == 1 else { return }
295301
subscribeExpectation.fulfill()
296302
},
297-
onUnsubscribe: {
298-
guard $0 == #function, $1 == 0 else { return }
303+
onUnsubscribe: { details, error in
304+
guard
305+
error == nil,
306+
details.subscriptionKey == #function,
307+
details.currentSubscriptionCount == 0
308+
else { return }
299309
unsubscribeExpectation.fulfill()
300310
}
301311
).wait()
@@ -341,3 +351,47 @@ final class RedisPubSubCommandsPoolTests: RediStackConnectionPoolIntegrationTest
341351
XCTAssertEqual(self.pool.leasedConnectionCount, 0)
342352
}
343353
}
354+
355+
// MARK: - #103 tests
356+
357+
extension RedisPubSubCommandsTests {
358+
func test_pubsub_calls_unsubscribe_whenUnexpectedClose() throws {
359+
let channel = EmbeddedChannel()
360+
try channel
361+
.addBaseRedisHandlers()
362+
.wait()
363+
364+
let subscribeExpectation = self.expectation(description: "should see subscribe")
365+
let unsubscribeExpectation = self.expectation(description: "should see unsubscribe")
366+
367+
let connection = RedisConnection(configuredRESPChannel: channel, defaultLogger: .init(label: ""))
368+
let subscribeFuture = connection
369+
.subscribe(
370+
to: [.init(#function)],
371+
messageReceiver: { _, _ in },
372+
onSubscribe: { _, _ in subscribeExpectation.fulfill() },
373+
onUnsubscribe: { _, error in
374+
guard error != nil else { return }
375+
unsubscribeExpectation.fulfill()
376+
}
377+
)
378+
379+
// mimics a successful subscription response from the server
380+
let allocator = ByteBufferAllocator()
381+
var buffer = allocator.buffer(capacity: 300)
382+
buffer.writeRESPValue(.array([
383+
.init(bulk: "subscribe"),
384+
.init(bulk: "\(#function)"),
385+
.integer(1)
386+
]))
387+
try channel.writeInbound(buffer)
388+
389+
// lets the initial subscription work finish
390+
try subscribeFuture.wait()
391+
392+
// 'unexpected' close, should trigger expectations
393+
try channel.close().wait()
394+
395+
self.waitForExpectations(timeout: 0.5)
396+
}
397+
}

0 commit comments

Comments
 (0)