Skip to content
Prev Previous commit
Next Next commit
Update tests to use Atomics
  • Loading branch information
stevapple committed Jun 24, 2022
commit 4b0d4a93fd2ec0a83591ea03609aa9cf323f473f
203 changes: 102 additions & 101 deletions Tests/NIOEmbeddedTests/AsyncEmbeddedEventLoopTests.swift

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions Tests/NIOEmbeddedTests/TestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Atomics
import Foundation
import XCTest
import NIOCore
Expand Down Expand Up @@ -60,13 +61,13 @@ extension EventLoopFuture {
}
}

internal func XCTAssertCompareAndSwapSucceeds<Type: NIOAtomicPrimitive>(
storage: NIOAtomic<Type>,
internal func XCTAssertCompareAndSwapSucceeds<Type: AtomicValue>(
storage: ManagedAtomic<Type>,
expected: Type,
desired: Type,
file: StaticString = #file,
line: UInt = #line
) {
let swapped = storage.compareAndExchange(expected: expected, desired: desired)
XCTAssertTrue(swapped, file: file, line: line)
let result = storage.compareExchange(expected: expected, desired: desired, ordering: .relaxed)
XCTAssertTrue(result.exchanged, file: file, line: line)
}
8 changes: 4 additions & 4 deletions Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import XCTest
import NIOCore
@testable import NIOPosix
import NIOConcurrencyHelpers
import Atomics


public final class AcceptBackoffHandlerTest: XCTestCase {
Expand Down Expand Up @@ -204,9 +204,9 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

let readCountHandler = ReadCountHandler()

let backoffProviderCalled = NIOAtomic<Int>.makeAtomic(value: 0)
let backoffProviderCalled = ManagedAtomic(0)
let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in
if backoffProviderCalled.add(1) == 0 {
if backoffProviderCalled.loadThenWrappingIncrement(ordering: .relaxed) == 0 {
return .seconds(1)
}
return .seconds(2)
Expand Down Expand Up @@ -234,7 +234,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase {

XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())

XCTAssertEqual(2, backoffProviderCalled.load())
XCTAssertEqual(2, backoffProviderCalled.load(ordering: .relaxed))
}

private final class ReadCountHandler: ChannelOutboundHandler {
Expand Down
14 changes: 7 additions & 7 deletions Tests/NIOPosixTests/ChannelPipelineTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import XCTest
import NIOConcurrencyHelpers
import Atomics
@testable import NIOCore
import NIOEmbedded
import NIOPosix
Expand Down Expand Up @@ -148,24 +148,24 @@ class ChannelPipelineTest: XCTestCase {

let handler = DummyHandler()
defer {
XCTAssertFalse(handler.handlerAddedCalled.load())
XCTAssertFalse(handler.handlerRemovedCalled.load())
XCTAssertFalse(handler.handlerAddedCalled.load(ordering: .relaxed))
XCTAssertFalse(handler.handlerRemovedCalled.load(ordering: .relaxed))
}
XCTAssertThrowsError(try channel.pipeline.addHandler(handler).wait()) { error in
XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError)
}
}

private final class DummyHandler: ChannelHandler {
let handlerAddedCalled = NIOAtomic<Bool>.makeAtomic(value: false)
let handlerRemovedCalled = NIOAtomic<Bool>.makeAtomic(value: false)
let handlerAddedCalled = ManagedAtomic(false)
let handlerRemovedCalled = ManagedAtomic(false)

public func handlerAdded(context: ChannelHandlerContext) {
handlerAddedCalled.store(true)
handlerAddedCalled.store(true, ordering: .relaxed)
}

public func handlerRemoved(context: ChannelHandlerContext) {
handlerRemovedCalled.store(true)
handlerRemovedCalled.store(true, ordering: .relaxed)
}
}

Expand Down
26 changes: 13 additions & 13 deletions Tests/NIOPosixTests/EchoServerClientTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

import XCTest
import NIOConcurrencyHelpers
import Atomics
import Dispatch
import NIOCore
@testable import NIOPosix
Expand Down Expand Up @@ -352,8 +352,8 @@ class EchoServerClientTest : XCTestCase {

private final class CloseInInActiveAndUnregisteredChannelHandler: ChannelInboundHandler {
typealias InboundIn = Never
let alreadyClosedInChannelInactive = NIOAtomic<Bool>.makeAtomic(value: false)
let alreadyClosedInChannelUnregistered = NIOAtomic<Bool>.makeAtomic(value: false)
let alreadyClosedInChannelInactive = ManagedAtomic(false)
let alreadyClosedInChannelUnregistered = ManagedAtomic(false)
let channelUnregisteredPromise: EventLoopPromise<Void>
let channelInactivePromise: EventLoopPromise<Void>

Expand All @@ -370,7 +370,7 @@ class EchoServerClientTest : XCTestCase {
}

public func channelInactive(context: ChannelHandlerContext) {
if alreadyClosedInChannelInactive.compareAndExchange(expected: false, desired: true) {
if alreadyClosedInChannelInactive.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged {
XCTAssertFalse(self.channelUnregisteredPromise.futureResult.isFulfilled,
"channelInactive should fire before channelUnregistered")
context.close().map {
Expand All @@ -390,7 +390,7 @@ class EchoServerClientTest : XCTestCase {
}

public func channelUnregistered(context: ChannelHandlerContext) {
if alreadyClosedInChannelUnregistered.compareAndExchange(expected: false, desired: true) {
if alreadyClosedInChannelUnregistered.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged {
XCTAssertTrue(self.channelInactivePromise.futureResult.isFulfilled,
"when channelUnregister fires, channelInactive should already have fired")
context.close().map {
Expand Down Expand Up @@ -459,8 +459,8 @@ class EchoServerClientTest : XCTestCase {

_ = try inactivePromise.futureResult.and(unregistredPromise.futureResult).wait()

XCTAssertTrue(handler.alreadyClosedInChannelInactive.load())
XCTAssertTrue(handler.alreadyClosedInChannelUnregistered.load())
XCTAssertTrue(handler.alreadyClosedInChannelInactive.load(ordering: .relaxed))
XCTAssertTrue(handler.alreadyClosedInChannelUnregistered.load(ordering: .relaxed))
}

func testFlushOnEmpty() throws {
Expand Down Expand Up @@ -758,17 +758,17 @@ class EchoServerClientTest : XCTestCase {
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let acceptedRemotePort: NIOAtomic<Int> = .makeAtomic(value: -1)
let acceptedLocalPort: NIOAtomic<Int> = .makeAtomic(value: -2)
let acceptedRemotePort = ManagedAtomic<Int>(-1)
let acceptedLocalPort = ManagedAtomic<Int>(-2)
let sem = DispatchSemaphore(value: 0)

let serverChannel: Channel
do {
serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
acceptedRemotePort.store(channel.remoteAddress?.port ?? -3)
acceptedLocalPort.store(channel.localAddress?.port ?? -4)
acceptedRemotePort.store(channel.remoteAddress?.port ?? -3, ordering: .relaxed)
acceptedLocalPort.store(channel.localAddress?.port ?? -4, ordering: .relaxed)
sem.signal()
return channel.eventLoop.makeSucceededFuture(())
}.bind(host: host, port: 0).wait()
Expand Down Expand Up @@ -798,8 +798,8 @@ class EchoServerClientTest : XCTestCase {
}
sem.wait()
XCTAssertEqual(serverChannel.localAddress?.port, clientChannel.remoteAddress?.port)
XCTAssertEqual(acceptedLocalPort.load(), clientChannel.remoteAddress?.port ?? -5)
XCTAssertEqual(acceptedRemotePort.load(), clientChannel.localAddress?.port ?? -6)
XCTAssertEqual(acceptedLocalPort.load(ordering: .relaxed), clientChannel.remoteAddress?.port ?? -5)
XCTAssertEqual(acceptedRemotePort.load(ordering: .relaxed), clientChannel.localAddress?.port ?? -6)
}
XCTAssertTrue(atLeastOneSucceeded)
}
Expand Down
35 changes: 18 additions & 17 deletions Tests/NIOPosixTests/EventLoopTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import XCTest
@testable import NIOCore
import NIOEmbedded
@testable import NIOPosix
import Atomics
import Dispatch
import NIOConcurrencyHelpers

Expand Down Expand Up @@ -145,14 +146,14 @@ public final class EventLoopTest : XCTestCase {
XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully())
}

let counter = NIOAtomic<Int>.makeAtomic(value: 0)
let counter = ManagedAtomic<Int>(0)
let loop = eventLoopGroup.next()
let allDone = DispatchGroup()
allDone.enter()
loop.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { repeatedTask -> Void in
XCTAssertTrue(loop.inEventLoop)
let initialValue = counter.load()
counter.add(1)
let initialValue = counter.load(ordering: .relaxed)
counter.wrappingIncrement(ordering: .relaxed)
if initialValue == 0 {
XCTAssertTrue(NIODeadline.now() - nanos >= initialDelay)
} else if initialValue == count {
Expand All @@ -163,7 +164,7 @@ public final class EventLoopTest : XCTestCase {

allDone.wait()

XCTAssertEqual(counter.load(), count + 1)
XCTAssertEqual(counter.load(ordering: .relaxed), count + 1)
XCTAssertTrue(NIODeadline.now() - nanos >= initialDelay + Int64(count) * delay)
}

Expand Down Expand Up @@ -657,12 +658,12 @@ public final class EventLoopTest : XCTestCase {
class AssertHandler: ChannelInboundHandler {
typealias InboundIn = Any

let groupIsShutdown = NIOAtomic.makeAtomic(value: false)
let removed = NIOAtomic.makeAtomic(value: false)
let groupIsShutdown = ManagedAtomic(false)
let removed = ManagedAtomic(false)

public func handlerRemoved(context: ChannelHandlerContext) {
XCTAssertFalse(groupIsShutdown.load())
XCTAssertTrue(removed.compareAndExchange(expected: false, desired: true))
XCTAssertFalse(groupIsShutdown.load(ordering: .relaxed))
XCTAssertTrue(removed.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged)
}
}

Expand All @@ -679,16 +680,16 @@ public final class EventLoopTest : XCTestCase {
channel.connect(to: serverSocket.localAddress!)
}
}.wait() as Void)
let closeFutureFulfilledEventually = NIOAtomic<Bool>.makeAtomic(value: false)
let closeFutureFulfilledEventually = ManagedAtomic(false)
XCTAssertFalse(channel.closeFuture.isFulfilled)
channel.closeFuture.whenSuccess {
XCTAssertTrue(closeFutureFulfilledEventually.compareAndExchange(expected: false, desired: true))
XCTAssertTrue(closeFutureFulfilledEventually.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged)
}
XCTAssertNoThrow(try group.syncShutdownGracefully())
XCTAssertTrue(assertHandler.groupIsShutdown.compareAndExchange(expected: false, desired: true))
XCTAssertTrue(assertHandler.removed.load())
XCTAssertTrue(assertHandler.groupIsShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged)
XCTAssertTrue(assertHandler.removed.load(ordering: .relaxed))
XCTAssertFalse(channel.isActive)
XCTAssertTrue(closeFutureFulfilledEventually.load())
XCTAssertTrue(closeFutureFulfilledEventually.load(ordering: .relaxed))
}

public func testScheduleMultipleTasks() throws {
Expand Down Expand Up @@ -1026,11 +1027,11 @@ public final class EventLoopTest : XCTestCase {
final class ExecuteSomethingOnEventLoop: ChannelInboundHandler {
typealias InboundIn = ByteBuffer

static let numberOfInstances = NIOAtomic<Int>.makeAtomic(value: 0)
static let numberOfInstances = ManagedAtomic<Int>(0)
let groupToNotify: DispatchGroup

init(groupToNotify: DispatchGroup) {
XCTAssertEqual(0, ExecuteSomethingOnEventLoop.numberOfInstances.add(1))
XCTAssertEqual(0, ExecuteSomethingOnEventLoop.numberOfInstances.loadThenWrappingIncrement(ordering: .relaxed))
self.groupToNotify = groupToNotify
}

Expand Down Expand Up @@ -1565,10 +1566,10 @@ fileprivate class EventLoopWithoutPreSucceededFuture: EventLoop {

final class EventLoopGroupOf3WithoutAnAnyImplementation: EventLoopGroup {
private let eventloops = [EmbeddedEventLoop(), EmbeddedEventLoop(), EmbeddedEventLoop()]
private let nextID = NIOAtomic<UInt64>.makeAtomic(value: 0)
private let nextID = ManagedAtomic<UInt64>(0)

func next() -> EventLoop {
return self.eventloops[Int(self.nextID.add(1) % UInt64(self.eventloops.count))]
return self.eventloops[Int(self.nextID.loadThenWrappingIncrement(ordering: .relaxed) % UInt64(self.eventloops.count))]
}

func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
Expand Down
8 changes: 4 additions & 4 deletions Tests/NIOPosixTests/NonBlockingFileIOTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import XCTest
import NIOCore
@testable import NIOPosix
import NIOConcurrencyHelpers
import Atomics

class NonBlockingFileIOTest: XCTestCase {
private var group: EventLoopGroup!
Expand Down Expand Up @@ -862,19 +862,19 @@ class NonBlockingFileIOTest: XCTestCase {
let numberOfChunks = 2_000
XCTAssertNoThrow(try withTemporaryFile(content: String(repeating: "X",
count: numberOfChunks)) { (fileHandle, path) in
let numberOfCalls = NIOAtomic<Int>.makeAtomic(value: 0)
let numberOfCalls = ManagedAtomic(0)
XCTAssertNoThrow(try self.fileIO.readChunked(fileHandle: fileHandle,
fromOffset: 0,
byteCount: numberOfChunks,
chunkSize: 1,
allocator: self.allocator,
eventLoop: self.eventLoop) { buffer in
numberOfCalls.add(1)
numberOfCalls.wrappingIncrement(ordering: .relaxed)
XCTAssertEqual(1, buffer.readableBytes)
XCTAssertEqual(UInt8(ascii: "X"), buffer.readableBytesView.first)
return self.eventLoop.makeSucceededFuture(())
}.wait())
XCTAssertEqual(numberOfChunks, numberOfCalls.load())
XCTAssertEqual(numberOfChunks, numberOfCalls.load(ordering: .relaxed))
})
}

Expand Down
18 changes: 9 additions & 9 deletions Tests/NIOPosixTests/SALChannelTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import XCTest
import NIOCore
@testable import NIOPosix
import NIOConcurrencyHelpers
import Atomics

final class SALChannelTest: XCTestCase, SALTest {
var group: MultiThreadedEventLoopGroup!
Expand Down Expand Up @@ -68,24 +68,24 @@ final class SALChannelTest: XCTestCase, SALTest {
let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5)
var buffer = ByteBuffer(string: "12")

let writableNotificationStepExpectation = NIOAtomic<Int>.makeAtomic(value: 0)
let writableNotificationStepExpectation = ManagedAtomic(0)

final class DoWriteFromWritabilityChangedNotification: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer

var numberOfCalls = 0

var writableNotificationStepExpectation: NIOAtomic<Int>
var writableNotificationStepExpectation: ManagedAtomic<Int>

init(writableNotificationStepExpectation: NIOAtomic<Int>) {
init(writableNotificationStepExpectation: ManagedAtomic<Int>) {
self.writableNotificationStepExpectation = writableNotificationStepExpectation
}

func channelWritabilityChanged(context: ChannelHandlerContext) {
self.numberOfCalls += 1

XCTAssertEqual(self.writableNotificationStepExpectation.load(),
XCTAssertEqual(self.writableNotificationStepExpectation.load(ordering: .relaxed),
numberOfCalls)
switch self.numberOfCalls {
case 1:
Expand All @@ -100,7 +100,7 @@ final class SALChannelTest: XCTestCase, SALTest {
buffer.writeString("ABC")

// We expect another channelWritabilityChanged notification
XCTAssertTrue(self.writableNotificationStepExpectation.compareAndExchange(expected: 2, desired: 3))
XCTAssertTrue(self.writableNotificationStepExpectation.compareExchange(expected: 2, desired: 3, ordering: .relaxed).exchanged)
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
case 3:
// Next, we should go to false because we never send all the bytes.
Expand Down Expand Up @@ -134,7 +134,7 @@ final class SALChannelTest: XCTestCase, SALTest {
}

// Before sending back the writable notification, we know that that'll trigger a Channel writability change
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 1, desired: 2))
XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 1, desired: 2, ordering: .relaxed).exchanged)
let writableEvent = SelectorEvent(io: [.write],
registration: NIORegistration(channel: .socketChannel(channel),
interested: [.write],
Expand All @@ -157,7 +157,7 @@ final class SALChannelTest: XCTestCase, SALTest {

// This time, we'll make the write write everything which should also lead to a final channelWritability
// change.
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 3, desired: 4))
XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 3, desired: 4, ordering: .relaxed).exchanged)
try self.assertWrite(expectedFD: .max,
expectedBytes: buffer,
return: .processed(2))
Expand All @@ -177,7 +177,7 @@ final class SALChannelTest: XCTestCase, SALTest {
channel.pipeline.addHandler(DoWriteFromWritabilityChangedNotification(writableNotificationStepExpectation: writableNotificationStepExpectation))
}.flatMap {
// This write should cause a Channel writability change.
XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 0, desired: 1))
XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 0, desired: 1, ordering: .relaxed).exchanged)
return channel.writeAndFlush(buffer)
}
}.salWait())
Expand Down
Loading