diff --git a/Package.swift b/Package.swift index 8bf6697243f..612a905703e 100644 --- a/Package.swift +++ b/Package.swift @@ -15,6 +15,8 @@ import PackageDescription +let swiftAtomics: PackageDescription.Target.Dependency = .product(name: "Atomics", package: "swift-atomics") + var targets: [PackageDescription.Target] = [ .target(name: "NIOCore", dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows"]), @@ -22,14 +24,16 @@ var targets: [PackageDescription.Target] = [ .target(name: "NIOEmbedded", dependencies: ["NIOCore", "NIOConcurrencyHelpers", - "_NIODataStructures"]), + "_NIODataStructures", + swiftAtomics]), .target(name: "NIOPosix", dependencies: ["CNIOLinux", "CNIODarwin", "CNIOWindows", "NIOConcurrencyHelpers", "NIOCore", - "_NIODataStructures"]), + "_NIODataStructures", + swiftAtomics]), .target(name: "NIO", dependencies: ["NIOCore", "NIOEmbedded", @@ -87,7 +91,7 @@ var targets: [PackageDescription.Target] = [ dependencies: ["NIOPosix", "NIOCore"], exclude: ["README.md"]), .target(name: "NIOTestUtils", - dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1"]), + dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", swiftAtomics]), .executableTarget(name: "NIOCrashTester", dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]), .executableTarget(name: "NIOAsyncAwaitDemo", @@ -135,6 +139,7 @@ let package = Package( .library(name: "NIOTestUtils", targets: ["NIOTestUtils"]), ], dependencies: [ + .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), ], targets: targets ) diff --git a/Sources/NIOConcurrencyHelpers/NIOAtomic.swift b/Sources/NIOConcurrencyHelpers/NIOAtomic.swift index b4ada0364ff..805f860d804 100644 --- a/Sources/NIOConcurrencyHelpers/NIOAtomic.swift +++ b/Sources/NIOConcurrencyHelpers/NIOAtomic.swift @@ -193,6 +193,7 @@ extension UInt: NIOAtomicPrimitive { /// By necessity, all atomic values are references: after all, it makes no /// sense to talk about managing an atomic value when each time it's modified /// the thread that modified it gets a local copy! +@available(*, deprecated, message:"please use ManagedAtomic from https://github.com/apple/swift-atomics instead") public final class NIOAtomic { @usableFromInline typealias Manager = ManagedBufferPointer @@ -313,6 +314,7 @@ public final class NIOAtomic { } #if compiler(>=5.5) && canImport(_Concurrency) +@available(*, deprecated) extension NIOAtomic: Sendable { } diff --git a/Sources/NIOConcurrencyHelpers/atomics.swift b/Sources/NIOConcurrencyHelpers/atomics.swift index af74f47d7ca..0abb452b616 100644 --- a/Sources/NIOConcurrencyHelpers/atomics.swift +++ b/Sources/NIOConcurrencyHelpers/atomics.swift @@ -51,6 +51,7 @@ fileprivate func sys_sched_yield() { /// Atomic primitives are useful when building constructs that need to /// communicate or cooperate across multiple threads. In the case of /// SwiftNIO this usually involves communicating across multiple event loops. +@available(*, deprecated, message: "please use UnsafeAtomic from https://github.com/apple/swift-atomics instead") public struct UnsafeEmbeddedAtomic { @usableFromInline internal let value: OpaquePointer @@ -173,7 +174,7 @@ public struct UnsafeEmbeddedAtomic { /// By necessity, all atomic values are references: after all, it makes no /// sense to talk about managing an atomic value when each time it's modified /// the thread that modified it gets a local copy! -@available(*, deprecated, message:"please use NIOAtomic instead") +@available(*, deprecated, message:"please use ManagedAtomic from https://github.com/apple/swift-atomics instead") public final class Atomic { @usableFromInline internal let embedded: UnsafeEmbeddedAtomic diff --git a/Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift b/Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift index 1a1092d1949..d562a243ecd 100644 --- a/Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift +++ b/Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// #if compiler(>=5.5.2) && canImport(_Concurrency) +import Atomics import Dispatch import _NIODataStructures import NIOCore @@ -62,9 +63,9 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { /// The current "time" for this event loop. This is an amount in nanoseconds. /// As we need to access this from any thread, we store this as an atomic. - private let _now = NIOAtomic.makeAtomic(value: 0) + private let _now = ManagedAtomic(0) internal var now: NIODeadline { - return NIODeadline.uptimeNanoseconds(self._now.load()) + return NIODeadline.uptimeNanoseconds(self._now.load(ordering: .relaxed)) } /// This is used to derive an identifier for this loop. @@ -80,7 +81,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { // arbitrary threads. This is required by the EventLoop protocol and cannot be avoided. // Specifically, Scheduled creation requires us to be able to define the cancellation // operation, so the task ID has to be created early. - private let scheduledTaskCounter = NIOAtomic.makeAtomic(value: 0) + private let scheduledTaskCounter = ManagedAtomic(0) private var scheduledTasks = PriorityQueue() /// Keep track of where promises are allocated to ensure we can identify their source if they leak. @@ -143,7 +144,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { @discardableResult public func scheduleTask(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled { let promise: EventLoopPromise = self.makePromise() - let taskID = self.scheduledTaskCounter.add(1) + let taskID = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed) let scheduled = Scheduled(promise: promise, cancellationTask: { if self.inEventLoop { @@ -270,7 +271,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { // Set the time correctly before we call into user code, then // call in for all tasks. - self._now.store(nextTask.readyTime.uptimeNanoseconds) + self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed) for task in tasks { task.task() @@ -280,7 +281,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { } // Finally ensure we got the time right. - self._now.store(newTime.uptimeNanoseconds) + self._now.store(newTime.uptimeNanoseconds, ordering: .relaxed) continuation.resume() } @@ -311,7 +312,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable { internal func drainScheduledTasksByRunningAllCurrentlyScheduledTasks() { var currentlyScheduledTasks = self.scheduledTasks while let nextTask = currentlyScheduledTasks.pop() { - self._now.store(nextTask.readyTime.uptimeNanoseconds) + self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed) nextTask.task() } // Just fail all the remaining scheduled tasks. Despite having run all the tasks that were diff --git a/Sources/NIOPosix/BaseSocketChannel.swift b/Sources/NIOPosix/BaseSocketChannel.swift index 47ae117df30..0d674c6cb9e 100644 --- a/Sources/NIOPosix/BaseSocketChannel.swift +++ b/Sources/NIOPosix/BaseSocketChannel.swift @@ -14,6 +14,7 @@ import NIOCore import NIOConcurrencyHelpers +import Atomics private struct SocketChannelLifecycleManager { // MARK: Types @@ -35,7 +36,7 @@ private struct SocketChannelLifecycleManager { // MARK: properties private let eventLoop: EventLoop // this is queried from the Channel, ie. must be thread-safe - internal let isActiveAtomic: NIOAtomic + internal let isActiveAtomic: ManagedAtomic // these are only to be accessed on the EventLoop // have we seen the `.readEOF` notification @@ -50,9 +51,9 @@ private struct SocketChannelLifecycleManager { self.eventLoop.assertInEventLoop() switch (oldValue, self.currentState) { case (_, .activated): - self.isActiveAtomic.store(true) + self.isActiveAtomic.store(true, ordering: .relaxed) case (.activated, _): - self.isActiveAtomic.store(false) + self.isActiveAtomic.store(false, ordering: .relaxed) default: () } @@ -63,7 +64,7 @@ private struct SocketChannelLifecycleManager { // isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable internal init( eventLoop: EventLoop, - isActiveAtomic: NIOAtomic, + isActiveAtomic: ManagedAtomic, supportReconnect: Bool ) { self.eventLoop = eventLoop @@ -238,7 +239,7 @@ class BaseSocketChannel: SelectableChannel, Chan private let closePromise: EventLoopPromise internal let selectableEventLoop: SelectableEventLoop private let _offEventLoopLock = Lock() - private let isActiveAtomic: NIOAtomic = .makeAtomic(value: false) + private let isActiveAtomic: ManagedAtomic = .init(false) // just a thread-safe way of having something to print about the socket from any thread internal let socketDescription: String @@ -345,7 +346,7 @@ class BaseSocketChannel: SelectableChannel, Chan // This is `Channel` API so must be thread-safe. public var isActive: Bool { - return self.isActiveAtomic.load() + return self.isActiveAtomic.load(ordering: .relaxed) } // This is `Channel` API so must be thread-safe. diff --git a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift index e14a6b8b562..5ede87a1f72 100644 --- a/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift +++ b/Sources/NIOPosix/MultiThreadedEventLoopGroup.swift @@ -15,6 +15,7 @@ import NIOCore import NIOConcurrencyHelpers import Dispatch +import Atomics struct NIORegistration: Registration { enum ChannelType { @@ -33,7 +34,7 @@ struct NIORegistration: Registration { var registrationID: SelectorRegistrationID } -private let nextEventLoopGroupID = NIOAtomic.makeAtomic(value: 0) +private let nextEventLoopGroupID = ManagedAtomic(0) /// Called per `NIOThread` that is created for an EventLoop to do custom initialization of the `NIOThread` before the actual `EventLoop` is run on it. typealias ThreadInitializer = (NIOThread) -> Void @@ -62,7 +63,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { private static let threadSpecificEventLoop = ThreadSpecificVariable() private let myGroupID: Int - private let index = NIOAtomic.makeAtomic(value: 0) + private let index = ManagedAtomic(0) private var eventLoops: [SelectableEventLoop] private let shutdownLock: Lock = Lock() private var runState: RunState = .running @@ -148,7 +149,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { /// - threadInitializers: The `ThreadInitializer`s to use. internal init(threadInitializers: [ThreadInitializer], selectorFactory: @escaping () throws -> NIOPosix.Selector = NIOPosix.Selector.init) { - let myGroupID = nextEventLoopGroupID.add(1) + let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed) self.myGroupID = myGroupID var idx = 0 self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`. @@ -187,7 +188,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup { /// /// - returns: The next `EventLoop` to use. public func next() -> EventLoop { - return eventLoops[abs(index.add(1) % eventLoops.count)] + return eventLoops[abs(index.loadThenWrappingIncrement(ordering: .relaxed) % eventLoops.count)] } /// Returns the current `EventLoop` if we are on an `EventLoop` of this `MultiThreadedEventLoopGroup` instance. diff --git a/Sources/NIOPosix/PendingDatagramWritesManager.swift b/Sources/NIOPosix/PendingDatagramWritesManager.swift index 7849f0e96c3..4cd70368ce3 100644 --- a/Sources/NIOPosix/PendingDatagramWritesManager.swift +++ b/Sources/NIOPosix/PendingDatagramWritesManager.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// import NIOCore -import NIOConcurrencyHelpers +import Atomics private struct PendingDatagramWrite { var data: ByteBuffer @@ -400,7 +400,7 @@ final class PendingDatagramWritesManager: PendingWritesManager { private var state = PendingDatagramWritesState() internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024) - internal let channelWritabilityFlag: NIOAtomic = .makeAtomic(value: true) + internal let channelWritabilityFlag = ManagedAtomic(true) internal var publishedWritability = true internal var writeSpinCount: UInt = 16 private(set) var isOpen = true @@ -452,7 +452,8 @@ final class PendingDatagramWritesManager: PendingWritesManager { assert(self.isOpen) self.state.append(pendingWrite) - if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) { + if self.state.bytes > waterMark.high && + channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged { // Returns false to signal the Channel became non-writable and we need to notify the user. self.publishedWritability = false return false @@ -550,7 +551,7 @@ final class PendingDatagramWritesManager: PendingWritesManager { let (promise, result) = self.state.didWrite(data, messages: messages) if self.state.bytes < waterMark.low { - channelWritabilityFlag.store(true) + channelWritabilityFlag.store(true, ordering: .relaxed) } self.fulfillPromise(promise) diff --git a/Sources/NIOPosix/PendingWritesManager.swift b/Sources/NIOPosix/PendingWritesManager.swift index 55bda93d22b..20402811981 100644 --- a/Sources/NIOPosix/PendingWritesManager.swift +++ b/Sources/NIOPosix/PendingWritesManager.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// import NIOCore -import NIOConcurrencyHelpers +import Atomics private struct PendingStreamWrite { var data: IOData @@ -283,7 +283,7 @@ final class PendingStreamWritesManager: PendingWritesManager { private var storageRefs: UnsafeMutableBufferPointer> internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024) - internal let channelWritabilityFlag: NIOAtomic = .makeAtomic(value: true) + internal let channelWritabilityFlag = ManagedAtomic(true) internal var publishedWritability = true internal var writeSpinCount: UInt = 16 @@ -315,7 +315,8 @@ final class PendingStreamWritesManager: PendingWritesManager { assert(self.isOpen) self.state.append(.init(data: data, promise: promise)) - if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) { + if self.state.bytes > waterMark.high && + channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged { // Returns false to signal the Channel became non-writable and we need to notify the user. self.publishedWritability = false return false @@ -364,7 +365,7 @@ final class PendingStreamWritesManager: PendingWritesManager { let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result) if self.state.bytes < waterMark.low { - channelWritabilityFlag.store(true) + channelWritabilityFlag.store(true, ordering: .relaxed) } promise?.succeed(()) @@ -459,7 +460,7 @@ internal protocol PendingWritesManager: AnyObject { var isFlushPending: Bool { get } var writeSpinCount: UInt { get } var currentBestWriteMechanism: WriteMechanism { get } - var channelWritabilityFlag: NIOAtomic { get } + var channelWritabilityFlag: ManagedAtomic { get } /// Represents the writability state the last time we published a writability change to the `Channel`. /// This is used in `triggerWriteOperations` to determine whether we need to trigger a writability @@ -470,7 +471,7 @@ internal protocol PendingWritesManager: AnyObject { extension PendingWritesManager { // This is called from `Channel` API so must be thread-safe. var isWritable: Bool { - return self.channelWritabilityFlag.load() + return self.channelWritabilityFlag.load(ordering: .relaxed) } internal func triggerWriteOperations(triggerOneWriteOperation: (WriteMechanism) throws -> OneWriteOperationResult) throws -> OverallWriteResult { @@ -514,6 +515,6 @@ extension PendingWritesManager { extension PendingStreamWritesManager: CustomStringConvertible { var description: String { return "PendingStreamWritesManager { isFlushPending: \(self.isFlushPending), " + - /* */ "writabilityFlag: \(self.channelWritabilityFlag.load())), state: \(self.state) }" + /* */ "writabilityFlag: \(self.channelWritabilityFlag.load(ordering: .relaxed))), state: \(self.state) }" } } diff --git a/Sources/NIOPosix/SelectableEventLoop.swift b/Sources/NIOPosix/SelectableEventLoop.swift index 35e4372e34a..e1791f285e3 100644 --- a/Sources/NIOPosix/SelectableEventLoop.swift +++ b/Sources/NIOPosix/SelectableEventLoop.swift @@ -16,6 +16,7 @@ import Dispatch import NIOCore import NIOConcurrencyHelpers import _NIODataStructures +import Atomics /// Execute the given closure and ensure we release all auto pools if needed. @inlinable @@ -73,7 +74,7 @@ internal final class SelectableEventLoop: EventLoop { // This may only be read/written while holding the _tasksLock. internal var _pendingTaskPop = false @usableFromInline - internal var scheduledTaskCounter = NIOAtomic.makeAtomic(value: UInt64(0)) + internal var scheduledTaskCounter = ManagedAtomic(0) @usableFromInline internal var _scheduledTasks = PriorityQueue() @@ -276,7 +277,7 @@ Further information: @inlinable internal func scheduleTask(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled { let promise: EventLoopPromise = self.makePromise() - let task = ScheduledTask(id: self.scheduledTaskCounter.add(1), { + let task = ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), { do { promise.succeed(try task()) } catch let err { @@ -317,7 +318,7 @@ Further information: @inlinable internal func execute(_ task: @escaping () -> Void) { // nothing we can do if we fail enqueuing here. - try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.add(1), task, { error in + try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), task, { error in // do nothing }, .now())) } diff --git a/Sources/NIOTestUtils/EventCounterHandler.swift b/Sources/NIOTestUtils/EventCounterHandler.swift index a37a8b5045c..6eea5f613c3 100644 --- a/Sources/NIOTestUtils/EventCounterHandler.swift +++ b/Sources/NIOTestUtils/EventCounterHandler.swift @@ -14,6 +14,11 @@ import NIOCore import NIOConcurrencyHelpers +#if compiler(>=5.6) +@preconcurrency import Atomics +#else +import Atomics +#endif /// `EventCounterHandler` is a `ChannelHandler` that counts and forwards all the events that it sees coming through /// the `ChannelPipeline`. @@ -23,24 +28,24 @@ import NIOConcurrencyHelpers /// /// - note: Contrary to most `ChannelHandler`s, all of `EventCounterHandler`'s API is thread-safe meaning that you can /// query the events received from any thread. -public final class EventCounterHandler: NIOSendable { - private let _channelRegisteredCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelUnregisteredCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelActiveCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelInactiveCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelReadCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelReadCompleteCalls = NIOAtomic.makeAtomic(value: 0) - private let _channelWritabilityChangedCalls = NIOAtomic.makeAtomic(value: 0) - private let _userInboundEventTriggeredCalls = NIOAtomic.makeAtomic(value: 0) - private let _errorCaughtCalls = NIOAtomic.makeAtomic(value: 0) - private let _registerCalls = NIOAtomic.makeAtomic(value: 0) - private let _bindCalls = NIOAtomic.makeAtomic(value: 0) - private let _connectCalls = NIOAtomic.makeAtomic(value: 0) - private let _writeCalls = NIOAtomic.makeAtomic(value: 0) - private let _flushCalls = NIOAtomic.makeAtomic(value: 0) - private let _readCalls = NIOAtomic.makeAtomic(value: 0) - private let _closeCalls = NIOAtomic.makeAtomic(value: 0) - private let _triggerUserOutboundEventCalls = NIOAtomic.makeAtomic(value: 0) +public final class EventCounterHandler { + private let _channelRegisteredCalls = ManagedAtomic(0) + private let _channelUnregisteredCalls = ManagedAtomic(0) + private let _channelActiveCalls = ManagedAtomic(0) + private let _channelInactiveCalls = ManagedAtomic(0) + private let _channelReadCalls = ManagedAtomic(0) + private let _channelReadCompleteCalls = ManagedAtomic(0) + private let _channelWritabilityChangedCalls = ManagedAtomic(0) + private let _userInboundEventTriggeredCalls = ManagedAtomic(0) + private let _errorCaughtCalls = ManagedAtomic(0) + private let _registerCalls = ManagedAtomic(0) + private let _bindCalls = ManagedAtomic(0) + private let _connectCalls = ManagedAtomic(0) + private let _writeCalls = ManagedAtomic(0) + private let _flushCalls = ManagedAtomic(0) + private let _readCalls = ManagedAtomic(0) + private let _closeCalls = ManagedAtomic(0) + private let _triggerUserOutboundEventCalls = ManagedAtomic(0) public init() {} } @@ -57,87 +62,87 @@ extension EventCounterHandler { /// Returns the number of `channelRegistered` events seen so far in the `ChannelPipeline`. public var channelRegisteredCalls: Int { - return self._channelRegisteredCalls.load() + return self._channelRegisteredCalls.load(ordering: .relaxed) } /// Returns the number of `channelUnregistered` events seen so far in the `ChannelPipeline`. public var channelUnregisteredCalls: Int { - return self._channelUnregisteredCalls.load() + return self._channelUnregisteredCalls.load(ordering: .relaxed) } /// Returns the number of `channelActive` events seen so far in the `ChannelPipeline`. public var channelActiveCalls: Int { - return self._channelActiveCalls.load() + return self._channelActiveCalls.load(ordering: .relaxed) } /// Returns the number of `channelInactive` events seen so far in the `ChannelPipeline`. public var channelInactiveCalls: Int { - return self._channelInactiveCalls.load() + return self._channelInactiveCalls.load(ordering: .relaxed) } /// Returns the number of `channelRead` events seen so far in the `ChannelPipeline`. public var channelReadCalls: Int { - return self._channelReadCalls.load() + return self._channelReadCalls.load(ordering: .relaxed) } /// Returns the number of `channelReadComplete` events seen so far in the `ChannelPipeline`. public var channelReadCompleteCalls: Int { - return self._channelReadCompleteCalls.load() + return self._channelReadCompleteCalls.load(ordering: .relaxed) } /// Returns the number of `channelWritabilityChanged` events seen so far in the `ChannelPipeline`. public var channelWritabilityChangedCalls: Int { - return self._channelWritabilityChangedCalls.load() + return self._channelWritabilityChangedCalls.load(ordering: .relaxed) } /// Returns the number of `userInboundEventTriggered` events seen so far in the `ChannelPipeline`. public var userInboundEventTriggeredCalls: Int { - return self._userInboundEventTriggeredCalls.load() + return self._userInboundEventTriggeredCalls.load(ordering: .relaxed) } /// Returns the number of `errorCaught` events seen so far in the `ChannelPipeline`. public var errorCaughtCalls: Int { - return self._errorCaughtCalls.load() + return self._errorCaughtCalls.load(ordering: .relaxed) } /// Returns the number of `register` events seen so far in the `ChannelPipeline`. public var registerCalls: Int { - return self._registerCalls.load() + return self._registerCalls.load(ordering: .relaxed) } /// Returns the number of `bind` events seen so far in the `ChannelPipeline`. public var bindCalls: Int { - return self._bindCalls.load() + return self._bindCalls.load(ordering: .relaxed) } /// Returns the number of `connect` events seen so far in the `ChannelPipeline`. public var connectCalls: Int { - return self._connectCalls.load() + return self._connectCalls.load(ordering: .relaxed) } /// Returns the number of `write` events seen so far in the `ChannelPipeline`. public var writeCalls: Int { - return self._writeCalls.load() + return self._writeCalls.load(ordering: .relaxed) } /// Returns the number of `flush` events seen so far in the `ChannelPipeline`. public var flushCalls: Int { - return self._flushCalls.load() + return self._flushCalls.load(ordering: .relaxed) } /// Returns the number of `read` events seen so far in the `ChannelPipeline`. public var readCalls: Int { - return self._readCalls.load() + return self._readCalls.load(ordering: .relaxed) } /// Returns the number of `close` events seen so far in the `ChannelPipeline`. public var closeCalls: Int { - return self._closeCalls.load() + return self._closeCalls.load(ordering: .relaxed) } /// Returns the number of `triggerUserOutboundEvent` events seen so far in the `ChannelPipeline`. public var triggerUserOutboundEventCalls: Int { - return self._triggerUserOutboundEventCalls.load() + return self._triggerUserOutboundEventCalls.load(ordering: .relaxed) } /// Validate some basic assumptions about the number of events and if any of those assumptions are violated, throw @@ -264,103 +269,111 @@ extension EventCounterHandler: ChannelDuplexHandler { /// @see: `_ChannelInboundHandler.channelRegistered` public func channelRegistered(context: ChannelHandlerContext) { - self._channelRegisteredCalls.add(1) + self._channelRegisteredCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelRegistered() } /// @see: `_ChannelInboundHandler.channelUnregistered` public func channelUnregistered(context: ChannelHandlerContext) { - self._channelUnregisteredCalls.add(1) + self._channelUnregisteredCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelUnregistered() } /// @see: `_ChannelInboundHandler.channelActive` public func channelActive(context: ChannelHandlerContext) { - self._channelActiveCalls.add(1) + self._channelActiveCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelActive() } /// @see: `_ChannelInboundHandler.channelInactive` public func channelInactive(context: ChannelHandlerContext) { - self._channelInactiveCalls.add(1) + self._channelInactiveCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelInactive() } /// @see: `_ChannelInboundHandler.channelRead` public func channelRead(context: ChannelHandlerContext, data: NIOAny) { - self._channelReadCalls.add(1) + self._channelReadCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelRead(data) } /// @see: `_ChannelInboundHandler.channelReadComplete` public func channelReadComplete(context: ChannelHandlerContext) { - self._channelReadCompleteCalls.add(1) + self._channelReadCompleteCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelReadComplete() } /// @see: `_ChannelInboundHandler.channelWritabilityChanged` public func channelWritabilityChanged(context: ChannelHandlerContext) { - self._channelWritabilityChangedCalls.add(1) + self._channelWritabilityChangedCalls.wrappingIncrement(ordering: .relaxed) context.fireChannelWritabilityChanged() } /// @see: `_ChannelInboundHandler.userInboundEventTriggered` public func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - self._userInboundEventTriggeredCalls.add(1) + self._userInboundEventTriggeredCalls.wrappingIncrement(ordering: .relaxed) context.fireUserInboundEventTriggered(event) } /// @see: `_ChannelInboundHandler.errorCaught` public func errorCaught(context: ChannelHandlerContext, error: Error) { - self._errorCaughtCalls.add(1) + self._errorCaughtCalls.wrappingIncrement(ordering: .relaxed) context.fireErrorCaught(error) } /// @see: `_ChannelOutboundHandler.register` public func register(context: ChannelHandlerContext, promise: EventLoopPromise?) { - self._registerCalls.add(1) + self._registerCalls.wrappingIncrement(ordering: .relaxed) context.register(promise: promise) } /// @see: `_ChannelOutboundHandler.bind` public func bind(context: ChannelHandlerContext, to: SocketAddress, promise: EventLoopPromise?) { - self._bindCalls.add(1) + self._bindCalls.wrappingIncrement(ordering: .relaxed) context.bind(to: to, promise: promise) } /// @see: `_ChannelOutboundHandler.connect` public func connect(context: ChannelHandlerContext, to: SocketAddress, promise: EventLoopPromise?) { - self._connectCalls.add(1) + self._connectCalls.wrappingIncrement(ordering: .relaxed) context.connect(to: to, promise: promise) } /// @see: `_ChannelOutboundHandler.write` public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { - self._writeCalls.add(1) + self._writeCalls.wrappingIncrement(ordering: .relaxed) context.write(data, promise: promise) } /// @see: `_ChannelOutboundHandler.flush` public func flush(context: ChannelHandlerContext) { - self._flushCalls.add(1) + self._flushCalls.wrappingIncrement(ordering: .relaxed) context.flush() } /// @see: `_ChannelOutboundHandler.read` public func read(context: ChannelHandlerContext) { - self._readCalls.add(1) + self._readCalls.wrappingIncrement(ordering: .relaxed) context.read() } /// @see: `_ChannelOutboundHandler.close` public func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { - self._closeCalls.add(1) + self._closeCalls.wrappingIncrement(ordering: .relaxed) context.close(mode: mode, promise: promise) } /// @see: `_ChannelOutboundHandler.triggerUserOutboundEvent` public func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise?) { - self._triggerUserOutboundEventCalls.add(1) + self._triggerUserOutboundEventCalls.wrappingIncrement(ordering: .relaxed) context.triggerUserOutboundEvent(event, promise: promise) } } + +#if compiler(>=5.6) && canImport(_Concurrency) +// This is a workaround before ManagedAtomic gets Sendable conformance. Once the support +// is ready, we should remove '@preconcurrency import' and declare NIOSendable directly. +extension EventCounterHandler: Sendable { + +} +#endif diff --git a/Tests/NIOConcurrencyHelpersTests/NIOConcurrencyHelpersTests.swift b/Tests/NIOConcurrencyHelpersTests/NIOConcurrencyHelpersTests.swift index b33dbe8f168..e7d839911a4 100644 --- a/Tests/NIOConcurrencyHelpersTests/NIOConcurrencyHelpersTests.swift +++ b/Tests/NIOConcurrencyHelpersTests/NIOConcurrencyHelpersTests.swift @@ -239,6 +239,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { testFor(UInt.self) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testLargeContendedNIOAtomicSum() { let noAsyncs: UInt64 = 50 let noCounts: UInt64 = 2_000 @@ -267,6 +268,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { XCTAssertEqual(sumOfIntegers(until: noAsyncs) * noCounts, ai.load()) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testCompareAndExchangeBoolNIOAtomic() { let ab = NIOAtomic.makeAtomic(value: true) @@ -280,6 +282,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { XCTAssertTrue(ab.compareAndExchange(expected: false, desired: true)) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testAllOperationsBoolNIOAtomic() { let ab = NIOAtomic.makeAtomic(value: false) XCTAssertEqual(false, ab.load()) @@ -296,6 +299,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { XCTAssertFalse(ab.compareAndExchange(expected: false, desired: true)) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testCompareAndExchangeUIntsNIOAtomic() { func testFor(_ value: T.Type) { let zero: T = 0 @@ -326,6 +330,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { testFor(UInt.self) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testCompareAndExchangeIntsNIOAtomic() { func testFor(_ value: T.Type) { let zero: T = 0 @@ -357,6 +362,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { testFor(Int.self) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testAddSubNIOAtomic() { func testFor(_ value: T.Type) { let zero: T = 0 @@ -395,6 +401,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { testFor(UInt.self) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testExchangeNIOAtomic() { func testFor(_ value: T.Type) { let zero: T = 0 @@ -426,6 +433,7 @@ class NIOConcurrencyHelpersTests: XCTestCase { testFor(UInt.self) } + @available(*, deprecated, message: "deprecated because it tests deprecated functionality") func testLoadStoreNIOAtomic() { func testFor(_ value: T.Type) { let zero: T = 0 @@ -1039,6 +1047,7 @@ func assert(_ condition: @autoclosure () -> Bool, within time: TimeAmount, testI } } +@available(*, deprecated, message: "deprecated because it is used to test deprecated functionality") fileprivate class IntHolderWithDeallocationTracking { private(set) var value: Int let allDeallocations: NIOAtomic diff --git a/Tests/NIOEmbeddedTests/AsyncEmbeddedEventLoopTests.swift b/Tests/NIOEmbeddedTests/AsyncEmbeddedEventLoopTests.swift index 69e074eca76..d743717320b 100644 --- a/Tests/NIOEmbeddedTests/AsyncEmbeddedEventLoopTests.swift +++ b/Tests/NIOEmbeddedTests/AsyncEmbeddedEventLoopTests.swift @@ -12,9 +12,14 @@ // //===----------------------------------------------------------------------===// import NIOCore -import NIOConcurrencyHelpers @testable import NIOEmbedded import XCTest +import NIOConcurrencyHelpers +#if compiler(>=5.6) +@preconcurrency import Atomics +#else +import Atomics +#endif private class EmbeddedTestError: Error { } @@ -23,14 +28,14 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let callbackRan = NIOAtomic.makeAtomic(value: false) + let callbackRan = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() try await loop.executeInContext { - loop.execute { callbackRan.store(true) } - XCTAssertFalse(callbackRan.load()) + loop.execute { callbackRan.store(true, ordering: .relaxed) } + XCTAssertFalse(callbackRan.load(ordering: .relaxed)) } await loop.run() - XCTAssertTrue(callbackRan.load()) + XCTAssertTrue(callbackRan.load(ordering: .relaxed)) } #else throw XCTSkip() @@ -41,18 +46,18 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let runCount = NIOAtomic.makeAtomic(value: 0) + let runCount = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() - loop.execute { runCount.add(1) } - loop.execute { runCount.add(1) } - loop.execute { runCount.add(1) } + loop.execute { runCount.wrappingIncrement(ordering: .relaxed) } + loop.execute { runCount.wrappingIncrement(ordering: .relaxed) } + loop.execute { runCount.wrappingIncrement(ordering: .relaxed) } try await loop.executeInContext { - XCTAssertEqual(runCount.load(), 0) + XCTAssertEqual(runCount.load(ordering: .relaxed), 0) } await loop.run() try await loop.executeInContext { - XCTAssertEqual(runCount.load(), 3) + XCTAssertEqual(runCount.load(ordering: .relaxed), 3) } } #else @@ -64,31 +69,31 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let sentinel = NIOAtomic.makeAtomic(value: 0) + let sentinel = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() loop.execute { // This should execute first. - XCTAssertEqual(sentinel.load(), 0) - sentinel.store(1) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 0) + sentinel.store(1, ordering: .relaxed) loop.execute { // This should execute third. - XCTAssertEqual(sentinel.load(), 2) - sentinel.store(3) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 2) + sentinel.store(3, ordering: .relaxed) } } loop.execute { // This should execute second. - XCTAssertEqual(sentinel.load(), 1) - sentinel.store(2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 1) + sentinel.store(2, ordering: .relaxed) } try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 0) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 0) } await loop.run() try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 3) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 3) } } #else @@ -100,22 +105,22 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let callbackRan = NIOAtomic.makeAtomic(value: false) + let callbackRan = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() - loop.execute { callbackRan.store(true) } + loop.execute { callbackRan.store(true, ordering: .relaxed) } try await loop.executeInContext { - XCTAssertFalse(callbackRan.load()) + XCTAssertFalse(callbackRan.load(ordering: .relaxed)) } await loop.run() - loop.execute { callbackRan.store(false) } + loop.execute { callbackRan.store(false, ordering: .relaxed) } try await loop.executeInContext { - XCTAssertTrue(callbackRan.load()) + XCTAssertTrue(callbackRan.load(ordering: .relaxed)) } await loop.run() try await loop.executeInContext { - XCTAssertFalse(callbackRan.load()) + XCTAssertFalse(callbackRan.load(ordering: .relaxed)) } } #else @@ -127,16 +132,16 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let callbackRan = NIOAtomic.makeAtomic(value: false) + let callbackRan = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() - loop.execute { callbackRan.store(true) } + loop.execute { callbackRan.store(true, ordering: .relaxed) } try await loop.executeInContext { - XCTAssertFalse(callbackRan.load()) + XCTAssertFalse(callbackRan.load(ordering: .relaxed)) } XCTAssertNoThrow(try loop.syncShutdownGracefully()) try await loop.executeInContext { - XCTAssertTrue(callbackRan.load()) + XCTAssertTrue(callbackRan.load(ordering: .relaxed)) } } #else @@ -148,16 +153,16 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let callbackRan = NIOAtomic.makeAtomic(value: false) + let callbackRan = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() - loop.execute { callbackRan.store(true) } + loop.execute { callbackRan.store(true, ordering: .relaxed) } try await loop.executeInContext { - XCTAssertFalse(callbackRan.load()) + XCTAssertFalse(callbackRan.load(ordering: .relaxed)) } await loop.shutdownGracefully() try await loop.executeInContext { - XCTAssertTrue(callbackRan.load()) + XCTAssertTrue(callbackRan.load(ordering: .relaxed)) } } #else @@ -169,30 +174,30 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let callbackCount = NIOAtomic.makeAtomic(value: 0) + let callbackCount = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() _ = loop.scheduleTask(in: .nanoseconds(5)) { - callbackCount.add(1) + callbackCount.loadThenWrappingIncrement(ordering: .relaxed) } try await loop.executeInContext { - XCTAssertEqual(callbackCount.load(), 0) + XCTAssertEqual(callbackCount.load(ordering: .relaxed), 0) } await loop.advanceTime(by: .nanoseconds(4)) try await loop.executeInContext { - XCTAssertEqual(callbackCount.load(), 0) + XCTAssertEqual(callbackCount.load(ordering: .relaxed), 0) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(callbackCount.load(), 1) + XCTAssertEqual(callbackCount.load(ordering: .relaxed), 1) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(callbackCount.load(), 1) + XCTAssertEqual(callbackCount.load(ordering: .relaxed), 1) } await loop.advanceTime(by: .hours(1)) try await loop.executeInContext { - XCTAssertEqual(callbackCount.load(), 1) + XCTAssertEqual(callbackCount.load(ordering: .relaxed), 1) } } #else @@ -204,21 +209,21 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let sentinel = NIOAtomic.makeAtomic(value: 0) + let sentinel = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() for index in 1...10 { _ = loop.scheduleTask(in: .nanoseconds(Int64(index))) { - sentinel.store(index) + sentinel.store(index, ordering: .relaxed) } } for val in 1...10 { try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), val - 1) + XCTAssertEqual(sentinel.load(ordering: .relaxed), val - 1) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), val) + XCTAssertEqual(sentinel.load(ordering: .relaxed), val) } } } @@ -231,22 +236,22 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let sentinel = NIOAtomic.makeAtomic(value: 0) + let sentinel = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() _ = loop.scheduleTask(in: .nanoseconds(5)) { - sentinel.store(1) + sentinel.store(1, ordering: .relaxed) loop.execute { - sentinel.store(2) + sentinel.store(2, ordering: .relaxed) } } await loop.advanceTime(by: .nanoseconds(4)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 0) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 0) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 2) } } #else @@ -258,41 +263,41 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let sentinel = NIOAtomic.makeAtomic(value: 0) + let sentinel = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() _ = loop.scheduleTask(in: .nanoseconds(5)) { - sentinel.store(1) + sentinel.store(1, ordering: .relaxed) _ = loop.scheduleTask(in: .nanoseconds(3)) { - sentinel.store(2) + sentinel.store(2, ordering: .relaxed) } _ = loop.scheduleTask(in: .nanoseconds(5)) { - sentinel.store(3) + sentinel.store(3, ordering: .relaxed) } } await loop.advanceTime(by: .nanoseconds(4)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 0) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 0) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 1) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 1) } await loop.advanceTime(by: .nanoseconds(2)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 1) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 1) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 2) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 2) } await loop.advanceTime(by: .nanoseconds(1)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 3) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 3) } } #else @@ -304,20 +309,20 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let sentinel = NIOAtomic.makeAtomic(value: 0) + let sentinel = ManagedAtomic(0) let loop = NIOAsyncEmbeddedEventLoop() loop.execute { - XCTAssertEqual(sentinel.load(), 0) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 0) _ = loop.scheduleTask(in: .nanoseconds(5)) { - XCTAssertEqual(sentinel.load(), 1) - sentinel.store(2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 1) + sentinel.store(2, ordering: .relaxed) } - loop.execute { sentinel.store(1) } + loop.execute { sentinel.store(1, ordering: .relaxed) } } await loop.advanceTime(by: .nanoseconds(5)) try await loop.executeInContext { - XCTAssertEqual(sentinel.load(), 2) + XCTAssertEqual(sentinel.load(ordering: .relaxed), 2) } } #else @@ -346,15 +351,15 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { - let fired = NIOAtomic.makeAtomic(value: false) + let fired = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { true } - task.futureResult.whenSuccess { fired.store($0) } + task.futureResult.whenSuccess { fired.store($0, ordering: .relaxed) } await loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired.load()) + XCTAssertFalse(fired.load(ordering: .relaxed)) await loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired.load()) + XCTAssertTrue(fired.load(ordering: .relaxed)) } #else throw XCTSkip() @@ -366,7 +371,7 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { let err = EmbeddedTestError() - let fired = NIOAtomic.makeAtomic(value: false) + let fired = ManagedAtomic(false) let loop = NIOAsyncEmbeddedEventLoop() let task = loop.scheduleTask(in: .nanoseconds(5)) { throw err @@ -376,13 +381,13 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { }.recover { caughtErr in XCTAssertTrue(err === caughtErr as? EmbeddedTestError) }.whenComplete { (_: Result) in - fired.store(true) + fired.store(true, ordering: .relaxed) } await loop.advanceTime(by: .nanoseconds(4)) - XCTAssertFalse(fired.load()) + XCTAssertFalse(fired.load(ordering: .relaxed)) await loop.advanceTime(by: .nanoseconds(1)) - XCTAssertTrue(fired.load()) + XCTAssertTrue(fired.load(ordering: .relaxed)) } #else throw XCTSkip() @@ -401,7 +406,7 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { let lock = Lock() var firstScheduled: Scheduled? = nil var secondScheduled: Scheduled? = nil - let orderingCounter = NIOAtomic.makeAtomic(value: 0) + let orderingCounter = ManagedAtomic(0) // Here's the setup. First, we'll set up two scheduled tasks to fire in 5 nanoseconds. Each of these // will attempt to cancel the other, but the first one scheduled will fire first. Additionally, each will execute{} a single @@ -469,7 +474,7 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { await loop.advanceTime(by: .nanoseconds(10)) // Now the final value should be 6. - XCTAssertEqual(orderingCounter.load(), 6) + XCTAssertEqual(orderingCounter.load(ordering: .relaxed), 6) } #else throw XCTSkip() @@ -533,21 +538,21 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { let eventLoop = NIOAsyncEmbeddedEventLoop() - let tasksRun = NIOAtomic.makeAtomic(value: 0) + let tasksRun = ManagedAtomic(0) let startTime = eventLoop.now eventLoop.scheduleTask(in: .nanoseconds(3141592)) { XCTAssertEqual(eventLoop.now, startTime + .nanoseconds(3141592)) - tasksRun.add(1) + tasksRun.wrappingIncrement(ordering: .relaxed) } eventLoop.scheduleTask(in: .seconds(3141592)) { XCTAssertEqual(eventLoop.now, startTime + .seconds(3141592)) - tasksRun.add(1) + tasksRun.wrappingIncrement(ordering: .relaxed) } await eventLoop.shutdownGracefully() - XCTAssertEqual(tasksRun.load(), 2) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 2) } #else throw XCTSkip() @@ -559,18 +564,18 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { let eventLoop = NIOAsyncEmbeddedEventLoop() - let tasksRun = NIOAtomic.makeAtomic(value: 0) + let tasksRun = ManagedAtomic(0) func scheduleNowAndIncrement() { eventLoop.scheduleTask(in: .nanoseconds(0)) { - tasksRun.add(1) + tasksRun.wrappingIncrement(ordering: .relaxed) scheduleNowAndIncrement() } } scheduleNowAndIncrement() await eventLoop.shutdownGracefully() - XCTAssertEqual(tasksRun.load(), 1) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 1) } #else throw XCTSkip() @@ -584,13 +589,13 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { let eventLoop = NIOAsyncEmbeddedEventLoop() let deadline = NIODeadline.uptimeNanoseconds(0) + .seconds(42) - let tasksRun = NIOAtomic.makeAtomic(value: 0) + let tasksRun = ManagedAtomic(0) eventLoop.scheduleTask(deadline: deadline) { - tasksRun.add(1) + tasksRun.loadThenWrappingIncrement(ordering: .relaxed) } await eventLoop.advanceTime(to: deadline) - XCTAssertEqual(tasksRun.load(), 1) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 1) } #else throw XCTSkip() @@ -603,22 +608,22 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { XCTAsyncTest { let eventLoop = NIOAsyncEmbeddedEventLoop() - let tasksRun = NIOAtomic.makeAtomic(value: 0) + let tasksRun = ManagedAtomic(0) eventLoop.scheduleTask(deadline: .uptimeNanoseconds(0) + .seconds(42)) { - tasksRun.add(1) + tasksRun.loadThenWrappingIncrement(ordering: .relaxed) } // t=40s await eventLoop.advanceTime(to: .uptimeNanoseconds(0) + .seconds(40)) - XCTAssertEqual(tasksRun.load(), 0) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 0) // t=40s (still) await eventLoop.advanceTime(to: .distantPast) - XCTAssertEqual(tasksRun.load(), 0) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 0) // t=42s await eventLoop.advanceTime(by: .seconds(2)) - XCTAssertEqual(tasksRun.load(), 1) + XCTAssertEqual(tasksRun.load(ordering: .relaxed), 1) } #else throw XCTSkip() @@ -630,25 +635,25 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { let eventLoop = NIOAsyncEmbeddedEventLoop() - let counter = NIOAtomic.makeAtomic(value: 0) + let counter = ManagedAtomic(0) eventLoop.execute { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 0) } eventLoop.execute { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 1) } eventLoop.execute { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 2) } await eventLoop.run() - XCTAssertEqual(counter.load(), 3) + XCTAssertEqual(counter.load(ordering: .relaxed), 3) } #else throw XCTSkip() @@ -660,25 +665,25 @@ final class NIOAsyncEmbeddedEventLoopTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { throw XCTSkip() } XCTAsyncTest { let eventLoop = NIOAsyncEmbeddedEventLoop() - let counter = NIOAtomic.makeAtomic(value: 0) + let counter = ManagedAtomic(0) eventLoop.scheduleTask(in: .seconds(1)) { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 1) } eventLoop.scheduleTask(in: .milliseconds(1)) { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 0) } eventLoop.scheduleTask(in: .seconds(1)) { - let original = counter.add(1) + let original = counter.loadThenWrappingIncrement(ordering: .relaxed) XCTAssertEqual(original, 2) } await eventLoop.advanceTime(by: .seconds(1)) - XCTAssertEqual(counter.load(), 3) + XCTAssertEqual(counter.load(ordering: .relaxed), 3) } #else throw XCTSkip() diff --git a/Tests/NIOEmbeddedTests/TestUtils.swift b/Tests/NIOEmbeddedTests/TestUtils.swift index bd109166509..3357b0a7211 100644 --- a/Tests/NIOEmbeddedTests/TestUtils.swift +++ b/Tests/NIOEmbeddedTests/TestUtils.swift @@ -11,6 +11,7 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// +import Atomics import Foundation import XCTest import NIOCore @@ -60,13 +61,13 @@ extension EventLoopFuture { } } -internal func XCTAssertCompareAndSwapSucceeds( - storage: NIOAtomic, +internal func XCTAssertCompareAndSwapSucceeds( + storage: ManagedAtomic, expected: Type, desired: Type, file: StaticString = #file, line: UInt = #line ) { - let swapped = storage.compareAndExchange(expected: expected, desired: desired) - XCTAssertTrue(swapped, file: file, line: line) + let result = storage.compareExchange(expected: expected, desired: desired, ordering: .relaxed) + XCTAssertTrue(result.exchanged, file: file, line: line) } diff --git a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift index ad7b8f9b942..015c6382ed8 100644 --- a/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift +++ b/Tests/NIOPosixTests/AcceptBackoffHandlerTest.swift @@ -15,7 +15,7 @@ import XCTest import NIOCore @testable import NIOPosix -import NIOConcurrencyHelpers +import Atomics public final class AcceptBackoffHandlerTest: XCTestCase { @@ -204,9 +204,9 @@ public final class AcceptBackoffHandlerTest: XCTestCase { let readCountHandler = ReadCountHandler() - let backoffProviderCalled = NIOAtomic.makeAtomic(value: 0) + let backoffProviderCalled = ManagedAtomic(0) let serverChannel = try setupChannel(group: group, readCountHandler: readCountHandler, backoffProvider: { err in - if backoffProviderCalled.add(1) == 0 { + if backoffProviderCalled.loadThenWrappingIncrement(ordering: .relaxed) == 0 { return .seconds(1) } return .seconds(2) @@ -234,7 +234,7 @@ public final class AcceptBackoffHandlerTest: XCTestCase { XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed()) - XCTAssertEqual(2, backoffProviderCalled.load()) + XCTAssertEqual(2, backoffProviderCalled.load(ordering: .relaxed)) } private final class ReadCountHandler: ChannelOutboundHandler { diff --git a/Tests/NIOPosixTests/ChannelPipelineTest.swift b/Tests/NIOPosixTests/ChannelPipelineTest.swift index cefce98941b..1917cf7eac6 100644 --- a/Tests/NIOPosixTests/ChannelPipelineTest.swift +++ b/Tests/NIOPosixTests/ChannelPipelineTest.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import XCTest -import NIOConcurrencyHelpers +import Atomics @testable import NIOCore import NIOEmbedded import NIOPosix @@ -148,8 +148,8 @@ class ChannelPipelineTest: XCTestCase { let handler = DummyHandler() defer { - XCTAssertFalse(handler.handlerAddedCalled.load()) - XCTAssertFalse(handler.handlerRemovedCalled.load()) + XCTAssertFalse(handler.handlerAddedCalled.load(ordering: .relaxed)) + XCTAssertFalse(handler.handlerRemovedCalled.load(ordering: .relaxed)) } XCTAssertThrowsError(try channel.pipeline.addHandler(handler).wait()) { error in XCTAssertEqual(.ioOnClosedChannel, error as? ChannelError) @@ -157,15 +157,15 @@ class ChannelPipelineTest: XCTestCase { } private final class DummyHandler: ChannelHandler { - let handlerAddedCalled = NIOAtomic.makeAtomic(value: false) - let handlerRemovedCalled = NIOAtomic.makeAtomic(value: false) + let handlerAddedCalled = ManagedAtomic(false) + let handlerRemovedCalled = ManagedAtomic(false) public func handlerAdded(context: ChannelHandlerContext) { - handlerAddedCalled.store(true) + handlerAddedCalled.store(true, ordering: .relaxed) } public func handlerRemoved(context: ChannelHandlerContext) { - handlerRemovedCalled.store(true) + handlerRemovedCalled.store(true, ordering: .relaxed) } } diff --git a/Tests/NIOPosixTests/EchoServerClientTest.swift b/Tests/NIOPosixTests/EchoServerClientTest.swift index 17e8367c6c4..b213e087856 100644 --- a/Tests/NIOPosixTests/EchoServerClientTest.swift +++ b/Tests/NIOPosixTests/EchoServerClientTest.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// import XCTest -import NIOConcurrencyHelpers +import Atomics import Dispatch import NIOCore @testable import NIOPosix @@ -352,8 +352,8 @@ class EchoServerClientTest : XCTestCase { private final class CloseInInActiveAndUnregisteredChannelHandler: ChannelInboundHandler { typealias InboundIn = Never - let alreadyClosedInChannelInactive = NIOAtomic.makeAtomic(value: false) - let alreadyClosedInChannelUnregistered = NIOAtomic.makeAtomic(value: false) + let alreadyClosedInChannelInactive = ManagedAtomic(false) + let alreadyClosedInChannelUnregistered = ManagedAtomic(false) let channelUnregisteredPromise: EventLoopPromise let channelInactivePromise: EventLoopPromise @@ -370,7 +370,7 @@ class EchoServerClientTest : XCTestCase { } public func channelInactive(context: ChannelHandlerContext) { - if alreadyClosedInChannelInactive.compareAndExchange(expected: false, desired: true) { + if alreadyClosedInChannelInactive.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged { XCTAssertFalse(self.channelUnregisteredPromise.futureResult.isFulfilled, "channelInactive should fire before channelUnregistered") context.close().map { @@ -390,7 +390,7 @@ class EchoServerClientTest : XCTestCase { } public func channelUnregistered(context: ChannelHandlerContext) { - if alreadyClosedInChannelUnregistered.compareAndExchange(expected: false, desired: true) { + if alreadyClosedInChannelUnregistered.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged { XCTAssertTrue(self.channelInactivePromise.futureResult.isFulfilled, "when channelUnregister fires, channelInactive should already have fired") context.close().map { @@ -459,8 +459,8 @@ class EchoServerClientTest : XCTestCase { _ = try inactivePromise.futureResult.and(unregistredPromise.futureResult).wait() - XCTAssertTrue(handler.alreadyClosedInChannelInactive.load()) - XCTAssertTrue(handler.alreadyClosedInChannelUnregistered.load()) + XCTAssertTrue(handler.alreadyClosedInChannelInactive.load(ordering: .relaxed)) + XCTAssertTrue(handler.alreadyClosedInChannelUnregistered.load(ordering: .relaxed)) } func testFlushOnEmpty() throws { @@ -758,8 +758,8 @@ class EchoServerClientTest : XCTestCase { defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let acceptedRemotePort: NIOAtomic = .makeAtomic(value: -1) - let acceptedLocalPort: NIOAtomic = .makeAtomic(value: -2) + let acceptedRemotePort = ManagedAtomic(-1) + let acceptedLocalPort = ManagedAtomic(-2) let sem = DispatchSemaphore(value: 0) let serverChannel: Channel @@ -767,8 +767,8 @@ class EchoServerClientTest : XCTestCase { serverChannel = try ServerBootstrap(group: group) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelInitializer { channel in - acceptedRemotePort.store(channel.remoteAddress?.port ?? -3) - acceptedLocalPort.store(channel.localAddress?.port ?? -4) + acceptedRemotePort.store(channel.remoteAddress?.port ?? -3, ordering: .relaxed) + acceptedLocalPort.store(channel.localAddress?.port ?? -4, ordering: .relaxed) sem.signal() return channel.eventLoop.makeSucceededFuture(()) }.bind(host: host, port: 0).wait() @@ -798,8 +798,8 @@ class EchoServerClientTest : XCTestCase { } sem.wait() XCTAssertEqual(serverChannel.localAddress?.port, clientChannel.remoteAddress?.port) - XCTAssertEqual(acceptedLocalPort.load(), clientChannel.remoteAddress?.port ?? -5) - XCTAssertEqual(acceptedRemotePort.load(), clientChannel.localAddress?.port ?? -6) + XCTAssertEqual(acceptedLocalPort.load(ordering: .relaxed), clientChannel.remoteAddress?.port ?? -5) + XCTAssertEqual(acceptedRemotePort.load(ordering: .relaxed), clientChannel.localAddress?.port ?? -6) } XCTAssertTrue(atLeastOneSucceeded) } diff --git a/Tests/NIOPosixTests/EventLoopTest.swift b/Tests/NIOPosixTests/EventLoopTest.swift index 8b72e885755..6da9b2ff7b1 100644 --- a/Tests/NIOPosixTests/EventLoopTest.swift +++ b/Tests/NIOPosixTests/EventLoopTest.swift @@ -15,6 +15,7 @@ import XCTest @testable import NIOCore import NIOEmbedded @testable import NIOPosix +import Atomics import Dispatch import NIOConcurrencyHelpers @@ -145,14 +146,14 @@ public final class EventLoopTest : XCTestCase { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } - let counter = NIOAtomic.makeAtomic(value: 0) + let counter = ManagedAtomic(0) let loop = eventLoopGroup.next() let allDone = DispatchGroup() allDone.enter() loop.scheduleRepeatedTask(initialDelay: initialDelay, delay: delay) { repeatedTask -> Void in XCTAssertTrue(loop.inEventLoop) - let initialValue = counter.load() - counter.add(1) + let initialValue = counter.load(ordering: .relaxed) + counter.wrappingIncrement(ordering: .relaxed) if initialValue == 0 { XCTAssertTrue(NIODeadline.now() - nanos >= initialDelay) } else if initialValue == count { @@ -163,7 +164,7 @@ public final class EventLoopTest : XCTestCase { allDone.wait() - XCTAssertEqual(counter.load(), count + 1) + XCTAssertEqual(counter.load(ordering: .relaxed), count + 1) XCTAssertTrue(NIODeadline.now() - nanos >= initialDelay + Int64(count) * delay) } @@ -657,12 +658,12 @@ public final class EventLoopTest : XCTestCase { class AssertHandler: ChannelInboundHandler { typealias InboundIn = Any - let groupIsShutdown = NIOAtomic.makeAtomic(value: false) - let removed = NIOAtomic.makeAtomic(value: false) + let groupIsShutdown = ManagedAtomic(false) + let removed = ManagedAtomic(false) public func handlerRemoved(context: ChannelHandlerContext) { - XCTAssertFalse(groupIsShutdown.load()) - XCTAssertTrue(removed.compareAndExchange(expected: false, desired: true)) + XCTAssertFalse(groupIsShutdown.load(ordering: .relaxed)) + XCTAssertTrue(removed.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged) } } @@ -679,16 +680,16 @@ public final class EventLoopTest : XCTestCase { channel.connect(to: serverSocket.localAddress!) } }.wait() as Void) - let closeFutureFulfilledEventually = NIOAtomic.makeAtomic(value: false) + let closeFutureFulfilledEventually = ManagedAtomic(false) XCTAssertFalse(channel.closeFuture.isFulfilled) channel.closeFuture.whenSuccess { - XCTAssertTrue(closeFutureFulfilledEventually.compareAndExchange(expected: false, desired: true)) + XCTAssertTrue(closeFutureFulfilledEventually.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged) } XCTAssertNoThrow(try group.syncShutdownGracefully()) - XCTAssertTrue(assertHandler.groupIsShutdown.compareAndExchange(expected: false, desired: true)) - XCTAssertTrue(assertHandler.removed.load()) + XCTAssertTrue(assertHandler.groupIsShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged) + XCTAssertTrue(assertHandler.removed.load(ordering: .relaxed)) XCTAssertFalse(channel.isActive) - XCTAssertTrue(closeFutureFulfilledEventually.load()) + XCTAssertTrue(closeFutureFulfilledEventually.load(ordering: .relaxed)) } public func testScheduleMultipleTasks() throws { @@ -1026,11 +1027,11 @@ public final class EventLoopTest : XCTestCase { final class ExecuteSomethingOnEventLoop: ChannelInboundHandler { typealias InboundIn = ByteBuffer - static let numberOfInstances = NIOAtomic.makeAtomic(value: 0) + static let numberOfInstances = ManagedAtomic(0) let groupToNotify: DispatchGroup init(groupToNotify: DispatchGroup) { - XCTAssertEqual(0, ExecuteSomethingOnEventLoop.numberOfInstances.add(1)) + XCTAssertEqual(0, ExecuteSomethingOnEventLoop.numberOfInstances.loadThenWrappingIncrement(ordering: .relaxed)) self.groupToNotify = groupToNotify } @@ -1565,10 +1566,10 @@ fileprivate class EventLoopWithoutPreSucceededFuture: EventLoop { final class EventLoopGroupOf3WithoutAnAnyImplementation: EventLoopGroup { private let eventloops = [EmbeddedEventLoop(), EmbeddedEventLoop(), EmbeddedEventLoop()] - private let nextID = NIOAtomic.makeAtomic(value: 0) + private let nextID = ManagedAtomic(0) func next() -> EventLoop { - return self.eventloops[Int(self.nextID.add(1) % UInt64(self.eventloops.count))] + return self.eventloops[Int(self.nextID.loadThenWrappingIncrement(ordering: .relaxed) % UInt64(self.eventloops.count))] } func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { diff --git a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift index 41a990de59b..72a0689f51b 100644 --- a/Tests/NIOPosixTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOPosixTests/NonBlockingFileIOTest.swift @@ -15,7 +15,7 @@ import XCTest import NIOCore @testable import NIOPosix -import NIOConcurrencyHelpers +import Atomics class NonBlockingFileIOTest: XCTestCase { private var group: EventLoopGroup! @@ -862,19 +862,19 @@ class NonBlockingFileIOTest: XCTestCase { let numberOfChunks = 2_000 XCTAssertNoThrow(try withTemporaryFile(content: String(repeating: "X", count: numberOfChunks)) { (fileHandle, path) in - let numberOfCalls = NIOAtomic.makeAtomic(value: 0) + let numberOfCalls = ManagedAtomic(0) XCTAssertNoThrow(try self.fileIO.readChunked(fileHandle: fileHandle, fromOffset: 0, byteCount: numberOfChunks, chunkSize: 1, allocator: self.allocator, eventLoop: self.eventLoop) { buffer in - numberOfCalls.add(1) + numberOfCalls.wrappingIncrement(ordering: .relaxed) XCTAssertEqual(1, buffer.readableBytes) XCTAssertEqual(UInt8(ascii: "X"), buffer.readableBytesView.first) return self.eventLoop.makeSucceededFuture(()) }.wait()) - XCTAssertEqual(numberOfChunks, numberOfCalls.load()) + XCTAssertEqual(numberOfChunks, numberOfCalls.load(ordering: .relaxed)) }) } diff --git a/Tests/NIOPosixTests/SALChannelTests.swift b/Tests/NIOPosixTests/SALChannelTests.swift index 779a9ec9556..134d4781e14 100644 --- a/Tests/NIOPosixTests/SALChannelTests.swift +++ b/Tests/NIOPosixTests/SALChannelTests.swift @@ -15,7 +15,7 @@ import XCTest import NIOCore @testable import NIOPosix -import NIOConcurrencyHelpers +import Atomics final class SALChannelTest: XCTestCase, SALTest { var group: MultiThreadedEventLoopGroup! @@ -68,7 +68,7 @@ final class SALChannelTest: XCTestCase, SALTest { let serverAddress = try! SocketAddress(ipAddress: "9.8.7.6", port: 5) var buffer = ByteBuffer(string: "12") - let writableNotificationStepExpectation = NIOAtomic.makeAtomic(value: 0) + let writableNotificationStepExpectation = ManagedAtomic(0) final class DoWriteFromWritabilityChangedNotification: ChannelInboundHandler { typealias InboundIn = ByteBuffer @@ -76,16 +76,16 @@ final class SALChannelTest: XCTestCase, SALTest { var numberOfCalls = 0 - var writableNotificationStepExpectation: NIOAtomic + var writableNotificationStepExpectation: ManagedAtomic - init(writableNotificationStepExpectation: NIOAtomic) { + init(writableNotificationStepExpectation: ManagedAtomic) { self.writableNotificationStepExpectation = writableNotificationStepExpectation } func channelWritabilityChanged(context: ChannelHandlerContext) { self.numberOfCalls += 1 - XCTAssertEqual(self.writableNotificationStepExpectation.load(), + XCTAssertEqual(self.writableNotificationStepExpectation.load(ordering: .relaxed), numberOfCalls) switch self.numberOfCalls { case 1: @@ -100,7 +100,7 @@ final class SALChannelTest: XCTestCase, SALTest { buffer.writeString("ABC") // We expect another channelWritabilityChanged notification - XCTAssertTrue(self.writableNotificationStepExpectation.compareAndExchange(expected: 2, desired: 3)) + XCTAssertTrue(self.writableNotificationStepExpectation.compareExchange(expected: 2, desired: 3, ordering: .relaxed).exchanged) context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil) case 3: // Next, we should go to false because we never send all the bytes. @@ -134,7 +134,7 @@ final class SALChannelTest: XCTestCase, SALTest { } // Before sending back the writable notification, we know that that'll trigger a Channel writability change - XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 1, desired: 2)) + XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 1, desired: 2, ordering: .relaxed).exchanged) let writableEvent = SelectorEvent(io: [.write], registration: NIORegistration(channel: .socketChannel(channel), interested: [.write], @@ -157,7 +157,7 @@ final class SALChannelTest: XCTestCase, SALTest { // This time, we'll make the write write everything which should also lead to a final channelWritability // change. - XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 3, desired: 4)) + XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 3, desired: 4, ordering: .relaxed).exchanged) try self.assertWrite(expectedFD: .max, expectedBytes: buffer, return: .processed(2)) @@ -177,7 +177,7 @@ final class SALChannelTest: XCTestCase, SALTest { channel.pipeline.addHandler(DoWriteFromWritabilityChangedNotification(writableNotificationStepExpectation: writableNotificationStepExpectation)) }.flatMap { // This write should cause a Channel writability change. - XCTAssertTrue(writableNotificationStepExpectation.compareAndExchange(expected: 0, desired: 1)) + XCTAssertTrue(writableNotificationStepExpectation.compareExchange(expected: 0, desired: 1, ordering: .relaxed).exchanged) return channel.writeAndFlush(buffer) } }.salWait()) diff --git a/Tests/NIOPosixTests/SelectorTest.swift b/Tests/NIOPosixTests/SelectorTest.swift index bc6a2cda398..ccdcfc74579 100644 --- a/Tests/NIOPosixTests/SelectorTest.swift +++ b/Tests/NIOPosixTests/SelectorTest.swift @@ -12,10 +12,10 @@ // //===----------------------------------------------------------------------===// +import XCTest import NIOCore @testable import NIOPosix -import NIOConcurrencyHelpers -import XCTest +import Atomics class SelectorTest: XCTestCase { @@ -394,20 +394,20 @@ class SelectorTest: XCTestCase { protocol: 0, socketVector: &socketFDs)) - let numberFires = NIOAtomic.makeAtomic(value: 0) + let numberFires = ManagedAtomic(0) let el = group.next() as! SelectableEventLoop let channelHasBeenClosedPromise = el.makePromise(of: Void.self) let channel = try SocketChannel(socket: FakeSocket(hasBeenClosedPromise: channelHasBeenClosedPromise, socket: socketFDs[0]), eventLoop: el) let sched = el.scheduleRepeatedTask(initialDelay: .microseconds(delayToUseInMicroSeconds), delay: .microseconds(delayToUseInMicroSeconds)) { (_: RepeatedTask) in - numberFires.add(1) + numberFires.wrappingIncrement(ordering: .relaxed) } XCTAssertNoThrow(try el.submit { // EL tick 1: this is used to // - actually arm the timer (timerfd_settime) // - set the channel registration up - if numberFires.load() > 0 { + if numberFires.load(ordering: .relaxed) > 0 { print("WARNING: This test hit a race and this result doesn't mean it actually worked." + " This should really only ever happen in very bizarre conditions.") } @@ -429,7 +429,7 @@ class SelectorTest: XCTestCase { // EL tick 3: happens in the background here. We will likely lose the timer signal because of the // `deregistrationsHappened` workaround in `Selector.swift` and we expect to pick it up again when we enter // `epoll_wait`/`kevent` next. This however only works if the timer event is level triggered. - assert(numberFires.load() > 5, within: .seconds(1), "timer only fired \(numberFires.load()) times") + assert(numberFires.load(ordering: .relaxed) > 5, within: .seconds(1), "timer only fired \(numberFires.load(ordering: .relaxed)) times") sched.cancel() XCTAssertNoThrow(try channelHasBeenClosedPromise.futureResult.wait()) } diff --git a/Tests/NIOPosixTests/SocketChannelTest.swift b/Tests/NIOPosixTests/SocketChannelTest.swift index 85a8138e2c0..c5014365e5f 100644 --- a/Tests/NIOPosixTests/SocketChannelTest.swift +++ b/Tests/NIOPosixTests/SocketChannelTest.swift @@ -15,7 +15,7 @@ import XCTest import NIOCore @testable import NIOPosix import NIOTestUtils -import NIOConcurrencyHelpers +import Atomics private extension Array { /// A helper function that asserts that a predicate is true for all elements. @@ -52,15 +52,15 @@ public final class SocketChannelTest : XCTestCase { // Ensure we can dispatch two concurrent set option's on each others // event loops. - let condition = NIOAtomic.makeAtomic(value: 0) + let condition = ManagedAtomic(0) let futureA = channelA.eventLoop.submit { - condition.add(1) - while condition.load() < 2 { } + condition.wrappingIncrement(ordering: .relaxed) + while condition.load(ordering: .relaxed) < 2 { } _ = channelB.setOption(ChannelOptions.backlog, value: 1) } let futureB = channelB.eventLoop.submit { - condition.add(1) - while condition.load() < 2 { } + condition.wrappingIncrement(ordering: .relaxed) + while condition.load(ordering: .relaxed) < 2 { } _ = channelA.setOption(ChannelOptions.backlog, value: 1) } try futureA.wait() @@ -670,10 +670,10 @@ public final class SocketChannelTest : XCTestCase { // - https://github.com/apple/swift-nio/issues/1030 // - https://github.com/apple/swift-nio/issues/1598 class HandsOutMoodySocketsServerSocket: ServerSocket { - let shouldAcceptsFail: NIOAtomic = .makeAtomic(value: true) + let shouldAcceptsFail = ManagedAtomic(true) override func accept(setNonBlocking: Bool = false) throws -> Socket? { XCTAssertTrue(setNonBlocking) - if self.shouldAcceptsFail.load() { + if self.shouldAcceptsFail.load(ordering: .relaxed) { throw NIOFcntlFailedError() } else { return try Socket(protocolFamily: .inet, @@ -720,7 +720,7 @@ public final class SocketChannelTest : XCTestCase { XCTAssertEqual(["errorCaught"], eventCounter.allTriggeredEvents()) XCTAssertEqual(1, eventCounter.errorCaughtCalls) - serverSock.shouldAcceptsFail.store(false) + serverSock.shouldAcceptsFail.store(false, ordering: .relaxed) XCTAssertNoThrow(try serverChan.eventLoop.submit { serverChan.readable() diff --git a/Tests/NIOPosixTests/StreamChannelsTest.swift b/Tests/NIOPosixTests/StreamChannelsTest.swift index 8ebc1ca5e71..02059d33391 100644 --- a/Tests/NIOPosixTests/StreamChannelsTest.swift +++ b/Tests/NIOPosixTests/StreamChannelsTest.swift @@ -16,7 +16,7 @@ import XCTest import NIOCore @testable import NIOPosix import NIOTestUtils -import NIOConcurrencyHelpers +import Atomics class StreamChannelTest: XCTestCase { var buffer: ByteBuffer! = nil @@ -309,21 +309,21 @@ class StreamChannelTest: XCTestCase { class FailOnReadHandler: ChannelInboundHandler { typealias InboundIn = ByteBuffer - let areReadsOkayNow: NIOAtomic + let areReadsOkayNow: ManagedAtomic - init(areReadOkayNow: NIOAtomic) { + init(areReadOkayNow: ManagedAtomic) { self.areReadsOkayNow = areReadOkayNow } func channelRead(context: ChannelHandlerContext, data: NIOAny) { - guard self.areReadsOkayNow.load() else { + guard self.areReadsOkayNow.load(ordering: .relaxed) else { XCTFail("unexpected read of \(self.unwrapInboundIn(data))") return } } func channelReadComplete(context: ChannelHandlerContext) { - guard self.areReadsOkayNow.load() else { + guard self.areReadsOkayNow.load(ordering: .relaxed) else { XCTFail("unexpected readComplete") return } @@ -331,11 +331,11 @@ class StreamChannelTest: XCTestCase { } func runTest(receiver: Channel, sender: Channel) throws { - let sends = NIOAtomic.makeAtomic(value: 0) + let sends = ManagedAtomic(0) precondition(receiver.eventLoop !== sender.eventLoop, "this test cannot run if sender and receiver live on the same EventLoop. \(receiver)") XCTAssertNoThrow(try receiver.setOption(ChannelOptions.autoRead, value: false).wait()) - let areReadsOkayNow: NIOAtomic = .makeAtomic(value: false) + let areReadsOkayNow = ManagedAtomic(false) XCTAssertNoThrow(try receiver.pipeline.addHandler(FailOnReadHandler(areReadOkayNow: areReadsOkayNow)).wait()) // We will immediately send exactly the amount of data that fits in the receiver's receive buffer. @@ -350,14 +350,14 @@ class StreamChannelTest: XCTestCase { // we send one byte at a time. Sending the receive buffer will trigger the EVFILT_EXCEPT loop // (rdar://53656794) for UNIX Domain Sockets and the additional 1 byte send loop will also pretty // reliably trigger it for TCP sockets. - let myBuffer = allBuffer.readSlice(length: sends.load() == 0 ? receiveBufferSize : 1)! + let myBuffer = allBuffer.readSlice(length: sends.load(ordering: .relaxed) == 0 ? receiveBufferSize : 1)! sender.writeAndFlush(myBuffer).map { - sends.add(1) + sends.wrappingIncrement(ordering: .relaxed) sender.eventLoop.scheduleTask(in: .microseconds(1)) { send() } }.whenFailure { error in - XCTAssert(areReadsOkayNow.load(), "error before the channel should go down") + XCTAssert(areReadsOkayNow.load(ordering: .relaxed), "error before the channel should go down") guard case .some(.ioOnClosedChannel) = error as? ChannelError else { XCTFail("unexpected error: \(error)") return @@ -373,11 +373,11 @@ class StreamChannelTest: XCTestCase { let eventLoop = (receiver.eventLoop as! SelectableEventLoop) XCTAssertNoThrow(try eventLoop._selector.testsOnly_withUnsafeSelectorFD { fd in try assertNoSelectorChanges(fd: fd, selector:eventLoop._selector) - }, "after \(sends.load()) sends, we got an unexpected selector event for \(receiver)") + }, "after \(sends.load(ordering: .relaxed)) sends, we got an unexpected selector event for \(receiver)") usleep(10000) } // We'll soon close the channels, so reads are now acceptable (from the EOF that we may read). - XCTAssertTrue(areReadsOkayNow.compareAndExchange(expected: false, desired: true)) + XCTAssertTrue(areReadsOkayNow.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged) } XCTAssertNoThrow(try forEachCrossConnectedStreamChannelPair(forceSeparateEventLoops: true, runTest)) }