-
Notifications
You must be signed in to change notification settings - Fork 57
Strict concurrency for tests #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -394,8 +394,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| let errorLogger = ErrorLoggingHandler() | ||
|
|
||
| let harness = self.harness { channel, _ in | ||
| channel.pipeline.addHandler(errorLogger).flatMap { | ||
| channel.eventLoop.makeFailedFuture(MultiplexerTestError.rejected) | ||
| channel.eventLoop.makeCompletedFuture { | ||
| let sync = channel.pipeline.syncOperations | ||
| try sync.addHandler(errorLogger) | ||
glbrntt marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| throw MultiplexerTestError.rejected | ||
| } | ||
| } | ||
| defer { | ||
|
|
@@ -627,9 +629,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| buffer.writeString("Hello from the unit tests!") | ||
|
|
||
| harness.multiplexer.createChildChannel(channelType: .session) { channel, _ in | ||
| channel.write(SSHChannelData(type: .channel, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.write(SSHChannelData(type: .stdErr, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.flush() | ||
| let sync = channel.pipeline.syncOperations | ||
| sync.write(NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), promise: nil) | ||
| sync.write(NIOAny(SSHChannelData(type: .stdErr, data: .byteBuffer(buffer))), promise: nil) | ||
| sync.flush() | ||
| return channel.eventLoop.makeSucceededFuture(()) | ||
| } | ||
|
|
||
|
|
@@ -709,8 +712,8 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
|
|
||
| // Let's create a channel. | ||
| harness.multiplexer.createChildChannel(channelType: .session) { channel, _ in | ||
| channel.setOption(ChannelOptions.autoRead, value: false).flatMap { | ||
| channel.pipeline.addHandler(readRecorder.value) | ||
| channel.setOption(ChannelOptions.autoRead, value: false).flatMapThrowing { | ||
| try channel.pipeline.syncOperations.addHandler(readRecorder.value) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -774,7 +777,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| var childChannels: [Channel] = [] | ||
| let harness = self.harness { channel, _ in | ||
| childChannels.append(channel) | ||
| return channel.pipeline.addHandler(ChannelInactiveRecorder()) | ||
| return channel.eventLoop.makeCompletedFuture { | ||
| try channel.pipeline.syncOperations.addHandler(ChannelInactiveRecorder()) | ||
| } | ||
| } | ||
| defer { | ||
| harness.finish() | ||
|
|
@@ -807,7 +812,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| var childChannels: [Channel] = [] | ||
| let harness = self.harness { channel, _ in | ||
| childChannels.append(channel) | ||
| return channel.pipeline.addHandler(ChannelInactiveRecorder()) | ||
| return channel.eventLoop.makeCompletedFuture { | ||
| try channel.pipeline.syncOperations.addHandler(ChannelInactiveRecorder()) | ||
| } | ||
| } | ||
| defer { | ||
| harness.finish() | ||
|
|
@@ -836,7 +843,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| // All channels are already inactive, but still have their inactive recorder (and so have not seen an event loop tick). | ||
| XCTAssertTrue(childChannels.allSatisfy { !$0.isActive }) | ||
| XCTAssertTrue( | ||
| childChannels.allSatisfy { (try? $0.pipeline.handler(type: ChannelInactiveRecorder.self).wait()) != nil } | ||
| childChannels.allSatisfy { | ||
| (try? $0.pipeline.syncOperations.handler(type: ChannelInactiveRecorder.self)) != nil | ||
| } | ||
| ) | ||
|
|
||
| // Claim the parent has gone inactive. All should go inactive. | ||
|
|
@@ -846,7 +855,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| harness.eventLoop.run() | ||
| XCTAssertTrue(childChannels.allSatisfy { !$0.isActive }) | ||
| XCTAssertTrue( | ||
| childChannels.allSatisfy { (try? $0.pipeline.handler(type: ChannelInactiveRecorder.self).wait()) == nil } | ||
| childChannels.allSatisfy { | ||
| (try? $0.pipeline.syncOperations.handler(type: ChannelInactiveRecorder.self)) == nil | ||
| } | ||
| ) | ||
|
|
||
| // And they didn't say anything. | ||
|
|
@@ -886,8 +897,13 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| // Issue a write to the child. | ||
| var bytes = channel.allocator.buffer(capacity: 1024) | ||
| bytes.writeString("Hello from the unit tests") | ||
| XCTAssertThrowsError(try channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(bytes))).wait()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to rewrite this because of "non-sendable" warning on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, I'll fix these up. |
||
| { error in | ||
|
|
||
| let writePromise = channel.eventLoop.makePromise(of: Void.self) | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(bytes))), | ||
| promise: writePromise | ||
| ) | ||
| XCTAssertThrowsError(try writePromise.futureResult.wait()) { error in | ||
| XCTAssertEqual(error as? ChannelError, .ioOnClosedChannel) | ||
| } | ||
|
|
||
|
|
@@ -964,10 +980,12 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
|
|
||
| // Let's create a channel. | ||
| harness.multiplexer.createChildChannel(channelType: .session) { channel, _ in | ||
| channel.setOption(ChannelOptions.autoRead, value: false).flatMap { | ||
| channel.setOption(ChannelOptions.allowRemoteHalfClosure, value: true) | ||
| }.flatMap { | ||
| channel.pipeline.addHandlers([readRecorder.value, eofRecorder.value]) | ||
| channel.eventLoop.makeCompletedFuture { | ||
| // SSH child channel supports sync options so '!' is okay. | ||
| try channel.syncOptions!.setOption(.autoRead, value: false) | ||
| try channel.syncOptions!.setOption(.allowRemoteHalfClosure, value: true) | ||
| try channel.pipeline.syncOperations.addHandler(readRecorder.value) | ||
| try channel.pipeline.syncOperations.addHandler(eofRecorder.value) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1010,7 +1028,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| var childChannel: Channel? | ||
| harness.multiplexer.createChildChannel(channelType: .session) { channel, _ in | ||
| childChannel = channel | ||
| return channel.pipeline.addHandler(inactiveRecorder.value) | ||
| return channel.eventLoop.makeCompletedFuture { | ||
| try channel.pipeline.syncOperations.addHandler(inactiveRecorder.value) | ||
| } | ||
| } | ||
|
|
||
| guard let channel = childChannel else { | ||
|
|
@@ -1031,15 +1051,23 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| buffer.writeString("Hello from the unit tests!") | ||
|
|
||
| for _ in 0..<5 { | ||
| channel.write(SSHChannelData(type: .channel, data: .byteBuffer(buffer)), promise: nil) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
| channel.pipeline.syncOperations.write( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), | ||
| promise: nil | ||
| ) | ||
| } | ||
|
|
||
| // Now we're going to add a final write: this will have a write promise. It should complete before | ||
| // the close promise does. | ||
| let finalWriteComplete = NIOLoopBoundBox(false, eventLoop: channel.eventLoop) | ||
| let eofComplete = NIOLoopBoundBox(false, eventLoop: channel.eventLoop) | ||
|
|
||
| channel.write(SSHChannelData(type: .channel, data: .byteBuffer(buffer))).whenSuccess { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and here |
||
| let writePromise = channel.eventLoop.makePromise(of: Void.self) | ||
| channel.pipeline.syncOperations.write( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), | ||
| promise: writePromise | ||
| ) | ||
| writePromise.futureResult.whenSuccess { | ||
| XCTAssertFalse(eofComplete.value) | ||
| XCTAssertFalse(inactiveRecorder.value.seenInactive) | ||
| finalWriteComplete.value = true | ||
|
|
@@ -1076,7 +1104,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| var childChannel: Channel? | ||
| harness.multiplexer.createChildChannel(channelType: .session) { channel, _ in | ||
| childChannel = channel | ||
| return channel.pipeline.addHandler(readRecorder) | ||
| return channel.eventLoop.makeCompletedFuture { | ||
| try channel.pipeline.syncOperations.addHandler(readRecorder) | ||
| } | ||
| } | ||
|
|
||
| guard let channel = childChannel else { | ||
|
|
@@ -1097,11 +1127,12 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| XCTAssertTrue(channel.isActive) | ||
|
|
||
| // Now write some data. This fails immediately. | ||
| XCTAssertThrowsError( | ||
| try channel.write( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(channel.allocator.buffer(capacity: 1024))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and here |
||
| ).wait() | ||
| ) { error in | ||
| let writePromise = channel.eventLoop.makePromise(of: Void.self) | ||
| channel.pipeline.syncOperations.write( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(channel.allocator.buffer(capacity: 1024)))), | ||
| promise: writePromise | ||
| ) | ||
| XCTAssertThrowsError(try writePromise.futureResult.wait()) { error in | ||
| XCTAssertEqual(error as? ChannelError, .outputClosed) | ||
| } | ||
| } | ||
|
|
@@ -1214,15 +1245,19 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| buffer.writeBytes(0..<6) | ||
|
|
||
| // Ok, send 3 bytes of data. Nothing happens. However, when this completes writability will still be false. | ||
| channel.write( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 3)!)), | ||
| channel.pipeline.syncOperations.write( | ||
| NIOAny( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 3)!)) | ||
| ), | ||
| promise: nil | ||
| ) | ||
| XCTAssertTrue(channel.isWritable) | ||
|
|
||
| // Now write 2 bytes of stderr. This flips the writability to false. | ||
| channel.write( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 2)!)), | ||
| channel.pipeline.syncOperations.write( | ||
| NIOAny( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 2)!)) | ||
| ), | ||
| promise: nil | ||
| ) | ||
| XCTAssertFalse(channel.isWritable) | ||
|
|
@@ -1234,7 +1269,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| XCTAssertEqual(harness.flushedMessages.count, 3) | ||
|
|
||
| // Another attempt at writing queues the write. | ||
| channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), | ||
| promise: nil | ||
| ) | ||
| XCTAssertFalse(channel.isWritable) | ||
| XCTAssertEqual(harness.flushedMessages.count, 3) | ||
|
|
||
|
|
@@ -1303,15 +1341,19 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
|
|
||
| // Ok, we're gonna write the first 4 bytes. The channel will stay writable. | ||
| XCTAssertTrue(channel.isWritable) | ||
| channel.writeAndFlush( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 4)!)), | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 4)!)) | ||
| ), | ||
| promise: nil | ||
| ) | ||
| XCTAssertTrue(channel.isWritable) | ||
|
|
||
| // The next byte makes the channel not writable. | ||
| channel.write( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 1)!)), | ||
| channel.pipeline.syncOperations.write( | ||
| NIOAny( | ||
| SSHChannelData(type: .channel, data: .byteBuffer(buffer.getSlice(at: buffer.readerIndex, length: 1)!)) | ||
| ), | ||
| promise: nil | ||
| ) | ||
| XCTAssertFalse(channel.isWritable) | ||
|
|
@@ -1576,7 +1618,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| buffer.writeBytes(0..<6) | ||
|
|
||
| // Ok, send 6 bytes of data immediately. The writability is false. | ||
| channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), | ||
| promise: nil | ||
| ) | ||
| XCTAssertFalse(channel.isWritable) | ||
|
|
||
| // Two writes should have occurred, one of size 3 and one of size 2. | ||
|
|
@@ -1598,7 +1643,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| XCTAssertEqual(harness.flushedMessages.count, 3) | ||
|
|
||
| // Issue another write, now of extended data, which is also bound by this limit. Again, nothing changes. | ||
| channel.writeAndFlush(SSHChannelData(type: .stdErr, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny(SSHChannelData(type: .stdErr, data: .byteBuffer(buffer))), | ||
| promise: nil | ||
| ) | ||
| XCTAssertFalse(channel.isWritable) | ||
| XCTAssertEqual(harness.flushedMessages.count, 3) | ||
|
|
||
|
|
@@ -1670,7 +1718,10 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| buffer.writeBytes(0..<5) | ||
|
|
||
| // Ok, we're gonna write 5 bytes. These will be split into two writes. | ||
| channel.writeAndFlush(SSHChannelData(type: .channel, data: .byteBuffer(buffer)), promise: nil) | ||
| channel.pipeline.syncOperations.writeAndFlush( | ||
| NIOAny(SSHChannelData(type: .channel, data: .byteBuffer(buffer))), | ||
| promise: nil | ||
| ) | ||
| XCTAssertEqual(harness.flushedMessages.count, 3) | ||
| self.assertChannelData( | ||
| harness.flushedMessage(1), | ||
|
|
@@ -1887,9 +1938,12 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
|
|
||
| let harness = self.harness { channel, type in | ||
| initializedChannels.append(type) | ||
|
|
||
| return channel.getOption(SSHChildChannelOptions.sshChannelType).map { type in | ||
| do { | ||
| let type = try channel.syncOptions!.getOption(SSHChildChannelOptions.sshChannelType) | ||
| typesFromOptions.append(type) | ||
| return channel.eventLoop.makeSucceededVoidFuture() | ||
| } catch { | ||
| return channel.eventLoop.makeFailedFuture(error) | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1930,7 +1984,9 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| let readCounter = ReadCountingHandler() | ||
|
|
||
| let harness = self.harness { channel, _ in | ||
| channel.pipeline.addHandler(readCounter) | ||
| channel.eventLoop.makeCompletedFuture { | ||
| try channel.pipeline.syncOperations.addHandler(readCounter) | ||
| } | ||
| } | ||
| defer { | ||
| harness.finish() | ||
|
|
@@ -1984,10 +2040,13 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| let readRecorder = ReadRecordingHandler() | ||
|
|
||
| let harness = self.harness { channel, _ in | ||
| channel.setOption(ChannelOptions.autoRead, value: true).flatMap { | ||
| channel.setOption(ChannelOptions.allowRemoteHalfClosure, value: true) | ||
| }.flatMap { | ||
| channel.pipeline.addHandlers([readRecorder, eofHandler]) | ||
| channel.eventLoop.makeCompletedFuture { | ||
| let options = channel.syncOptions! | ||
| try options.setOption(.autoRead, value: true) | ||
| try options.setOption(.allowRemoteHalfClosure, value: true) | ||
| let sync = channel.pipeline.syncOperations | ||
| try sync.addHandler(readRecorder) | ||
| try sync.addHandler(eofHandler) | ||
| } | ||
| } | ||
| defer { | ||
|
|
@@ -2020,10 +2079,13 @@ final class ChildChannelMultiplexerTests: XCTestCase { | |
| let readRecorder = ReadRecordingHandler() | ||
|
|
||
| let harness = self.harness { channel, _ in | ||
| channel.setOption(ChannelOptions.autoRead, value: true).flatMap { | ||
| channel.setOption(ChannelOptions.allowRemoteHalfClosure, value: true) | ||
| }.flatMap { | ||
| channel.pipeline.addHandlers([readRecorder, eofHandler]) | ||
| channel.eventLoop.makeCompletedFuture { | ||
| let options = channel.syncOptions! | ||
| try options.setOption(.autoRead, value: true) | ||
| try options.setOption(.allowRemoteHalfClosure, value: true) | ||
| let sync = channel.pipeline.syncOperations | ||
| try sync.addHandler(readRecorder) | ||
| try sync.addHandler(eofHandler) | ||
| } | ||
| } | ||
| defer { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: any reason we need
syncvariable? why not just chain?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason beyond habit!