Skip to content
11 changes: 8 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@

import PackageDescription

let swiftAtomics: PackageDescription.Target.Dependency = .product(name: "Atomics", package: "swift-atomics")

var targets: [PackageDescription.Target] = [
.target(name: "NIOCore",
dependencies: ["NIOConcurrencyHelpers", "CNIOLinux", "CNIOWindows"]),
.target(name: "_NIODataStructures"),
.target(name: "NIOEmbedded",
dependencies: ["NIOCore",
"NIOConcurrencyHelpers",
"_NIODataStructures"]),
"_NIODataStructures",
swiftAtomics]),
.target(name: "NIOPosix",
dependencies: ["CNIOLinux",
"CNIODarwin",
"CNIOWindows",
"NIOConcurrencyHelpers",
"NIOCore",
"_NIODataStructures"]),
"_NIODataStructures",
swiftAtomics]),
.target(name: "NIO",
dependencies: ["NIOCore",
"NIOEmbedded",
Expand Down Expand Up @@ -87,7 +91,7 @@ var targets: [PackageDescription.Target] = [
dependencies: ["NIOPosix", "NIOCore"],
exclude: ["README.md"]),
.target(name: "NIOTestUtils",
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1"]),
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", swiftAtomics]),
.executableTarget(name: "NIOCrashTester",
dependencies: ["NIOPosix", "NIOCore", "NIOEmbedded", "NIOHTTP1", "NIOWebSocket", "NIOFoundationCompat"]),
.executableTarget(name: "NIOAsyncAwaitDemo",
Expand Down Expand Up @@ -135,6 +139,7 @@ let package = Package(
.library(name: "NIOTestUtils", targets: ["NIOTestUtils"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),
],
targets: targets
)
2 changes: 2 additions & 0 deletions Sources/NIOConcurrencyHelpers/NIOAtomic.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ extension UInt: NIOAtomicPrimitive {
/// By necessity, all atomic values are references: after all, it makes no
/// sense to talk about managing an atomic value when each time it's modified
/// the thread that modified it gets a local copy!
@available(*, deprecated, message:"please use Atomics.ManagedAtomic instead")
public final class NIOAtomic<T: NIOAtomicPrimitive> {
@usableFromInline
typealias Manager = ManagedBufferPointer<Void, T.AtomicWrapper>
Expand Down Expand Up @@ -313,6 +314,7 @@ public final class NIOAtomic<T: NIOAtomicPrimitive> {
}

#if compiler(>=5.5) && canImport(_Concurrency)
@available(*, deprecated)
extension NIOAtomic: Sendable {

}
Expand Down
3 changes: 2 additions & 1 deletion Sources/NIOConcurrencyHelpers/atomics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ fileprivate func sys_sched_yield() {
/// Atomic primitives are useful when building constructs that need to
/// communicate or cooperate across multiple threads. In the case of
/// SwiftNIO this usually involves communicating across multiple event loops.
@available(*, deprecated, message: "please use Atomics.UnsafeAtomic instead")
public struct UnsafeEmbeddedAtomic<T: AtomicPrimitive> {
@usableFromInline
internal let value: OpaquePointer
Expand Down Expand Up @@ -173,7 +174,7 @@ public struct UnsafeEmbeddedAtomic<T: AtomicPrimitive> {
/// By necessity, all atomic values are references: after all, it makes no
/// sense to talk about managing an atomic value when each time it's modified
/// the thread that modified it gets a local copy!
@available(*, deprecated, message:"please use NIOAtomic instead")
@available(*, deprecated, message:"please use Atomics.ManagedAtomic instead")
public final class Atomic<T: AtomicPrimitive> {
@usableFromInline
internal let embedded: UnsafeEmbeddedAtomic<T>
Expand Down
15 changes: 8 additions & 7 deletions Sources/NIOEmbedded/AsyncEmbeddedEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
//
//===----------------------------------------------------------------------===//
#if compiler(>=5.5.2) && canImport(_Concurrency)
import Atomics
import Dispatch
import _NIODataStructures
import NIOCore
Expand Down Expand Up @@ -62,9 +63,9 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {

/// The current "time" for this event loop. This is an amount in nanoseconds.
/// As we need to access this from any thread, we store this as an atomic.
private let _now = NIOAtomic<UInt64>.makeAtomic(value: 0)
private let _now = ManagedAtomic<UInt64>(0)
internal var now: NIODeadline {
return NIODeadline.uptimeNanoseconds(self._now.load())
return NIODeadline.uptimeNanoseconds(self._now.load(ordering: .relaxed))
}

/// This is used to derive an identifier for this loop.
Expand All @@ -80,7 +81,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
// arbitrary threads. This is required by the EventLoop protocol and cannot be avoided.
// Specifically, Scheduled<T> creation requires us to be able to define the cancellation
// operation, so the task ID has to be created early.
private let scheduledTaskCounter = NIOAtomic<UInt64>.makeAtomic(value: 0)
private let scheduledTaskCounter = ManagedAtomic<UInt64>(0)
private var scheduledTasks = PriorityQueue<EmbeddedScheduledTask>()

/// Keep track of where promises are allocated to ensure we can identify their source if they leak.
Expand Down Expand Up @@ -143,7 +144,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
@discardableResult
public func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise()
let taskID = self.scheduledTaskCounter.add(1)
let taskID = self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed)

let scheduled = Scheduled(promise: promise, cancellationTask: {
if self.inEventLoop {
Expand Down Expand Up @@ -270,7 +271,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {

// Set the time correctly before we call into user code, then
// call in for all tasks.
self._now.store(nextTask.readyTime.uptimeNanoseconds)
self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed)

for task in tasks {
task.task()
Expand All @@ -280,7 +281,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
}

// Finally ensure we got the time right.
self._now.store(newTime.uptimeNanoseconds)
self._now.store(newTime.uptimeNanoseconds, ordering: .relaxed)

continuation.resume()
}
Expand Down Expand Up @@ -311,7 +312,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
internal func drainScheduledTasksByRunningAllCurrentlyScheduledTasks() {
var currentlyScheduledTasks = self.scheduledTasks
while let nextTask = currentlyScheduledTasks.pop() {
self._now.store(nextTask.readyTime.uptimeNanoseconds)
self._now.store(nextTask.readyTime.uptimeNanoseconds, ordering: .relaxed)
nextTask.task()
}
// Just fail all the remaining scheduled tasks. Despite having run all the tasks that were
Expand Down
13 changes: 7 additions & 6 deletions Sources/NIOPosix/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct SocketChannelLifecycleManager {
// MARK: Types
Expand All @@ -35,7 +36,7 @@ private struct SocketChannelLifecycleManager {
// MARK: properties
private let eventLoop: EventLoop
// this is queried from the Channel, ie. must be thread-safe
internal let isActiveAtomic: NIOAtomic<Bool>
internal let isActiveAtomic: ManagedAtomic<Bool>
// these are only to be accessed on the EventLoop

// have we seen the `.readEOF` notification
Expand All @@ -50,9 +51,9 @@ private struct SocketChannelLifecycleManager {
self.eventLoop.assertInEventLoop()
switch (oldValue, self.currentState) {
case (_, .activated):
self.isActiveAtomic.store(true)
self.isActiveAtomic.store(true, ordering: .relaxed)
case (.activated, _):
self.isActiveAtomic.store(false)
self.isActiveAtomic.store(false, ordering: .relaxed)
default:
()
}
Expand All @@ -63,7 +64,7 @@ private struct SocketChannelLifecycleManager {
// isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
internal init(
eventLoop: EventLoop,
isActiveAtomic: NIOAtomic<Bool>,
isActiveAtomic: ManagedAtomic<Bool>,
supportReconnect: Bool
) {
self.eventLoop = eventLoop
Expand Down Expand Up @@ -238,7 +239,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
private let closePromise: EventLoopPromise<Void>
internal let selectableEventLoop: SelectableEventLoop
private let _offEventLoopLock = Lock()
private let isActiveAtomic: NIOAtomic<Bool> = .makeAtomic(value: false)
private let isActiveAtomic: ManagedAtomic<Bool> = .init(false)
// just a thread-safe way of having something to print about the socket from any thread
internal let socketDescription: String

Expand Down Expand Up @@ -345,7 +346,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan

// This is `Channel` API so must be thread-safe.
public var isActive: Bool {
return self.isActiveAtomic.load()
return self.isActiveAtomic.load(ordering: .relaxed)
}

// This is `Channel` API so must be thread-safe.
Expand Down
9 changes: 5 additions & 4 deletions Sources/NIOPosix/MultiThreadedEventLoopGroup.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import NIOCore
import NIOConcurrencyHelpers
import Dispatch
import Atomics

struct NIORegistration: Registration {
enum ChannelType {
Expand All @@ -33,7 +34,7 @@ struct NIORegistration: Registration {
var registrationID: SelectorRegistrationID
}

private let nextEventLoopGroupID = NIOAtomic.makeAtomic(value: 0)
private let nextEventLoopGroupID = ManagedAtomic(0)

/// Called per `NIOThread` that is created for an EventLoop to do custom initialization of the `NIOThread` before the actual `EventLoop` is run on it.
typealias ThreadInitializer = (NIOThread) -> Void
Expand Down Expand Up @@ -62,7 +63,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
private static let threadSpecificEventLoop = ThreadSpecificVariable<SelectableEventLoop>()

private let myGroupID: Int
private let index = NIOAtomic<Int>.makeAtomic(value: 0)
private let index = ManagedAtomic<Int>(0)
private var eventLoops: [SelectableEventLoop]
private let shutdownLock: Lock = Lock()
private var runState: RunState = .running
Expand Down Expand Up @@ -148,7 +149,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
/// - threadInitializers: The `ThreadInitializer`s to use.
internal init(threadInitializers: [ThreadInitializer],
selectorFactory: @escaping () throws -> NIOPosix.Selector<NIORegistration> = NIOPosix.Selector<NIORegistration>.init) {
let myGroupID = nextEventLoopGroupID.add(1)
let myGroupID = nextEventLoopGroupID.loadThenWrappingIncrement(ordering: .relaxed)
self.myGroupID = myGroupID
var idx = 0
self.eventLoops = [] // Just so we're fully initialised and can vend `self` to the `SelectableEventLoop`.
Expand Down Expand Up @@ -187,7 +188,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
///
/// - returns: The next `EventLoop` to use.
public func next() -> EventLoop {
return eventLoops[abs(index.add(1) % eventLoops.count)]
return eventLoops[abs(index.loadThenWrappingIncrement(ordering: .relaxed) % eventLoops.count)]
}

/// Returns the current `EventLoop` if we are on an `EventLoop` of this `MultiThreadedEventLoopGroup` instance.
Expand Down
9 changes: 5 additions & 4 deletions Sources/NIOPosix/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct PendingDatagramWrite {
var data: ByteBuffer
Expand Down Expand Up @@ -400,7 +400,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private var state = PendingDatagramWritesState()

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal let channelWritabilityFlag = ManagedAtomic<Bool>(true)
internal var publishedWritability = true
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true
Expand Down Expand Up @@ -452,7 +452,8 @@ final class PendingDatagramWritesManager: PendingWritesManager {
assert(self.isOpen)
self.state.append(pendingWrite)

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
if self.state.bytes > waterMark.high,
channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged {
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
Expand Down Expand Up @@ -550,7 +551,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
let (promise, result) = self.state.didWrite(data, messages: messages)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
channelWritabilityFlag.store(true, ordering: .relaxed)
}

self.fulfillPromise(promise)
Expand Down
15 changes: 8 additions & 7 deletions Sources/NIOPosix/PendingWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
import NIOCore
import NIOConcurrencyHelpers
import Atomics

private struct PendingStreamWrite {
var data: IOData
Expand Down Expand Up @@ -283,7 +283,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal let channelWritabilityFlag = ManagedAtomic(true)
internal var publishedWritability = true

internal var writeSpinCount: UInt = 16
Expand Down Expand Up @@ -315,7 +315,8 @@ final class PendingStreamWritesManager: PendingWritesManager {
assert(self.isOpen)
self.state.append(.init(data: data, promise: promise))

if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
if self.state.bytes > waterMark.high,
channelWritabilityFlag.compareExchange(expected: true, desired: false, ordering: .relaxed).exchanged {
// Returns false to signal the Channel became non-writable and we need to notify the user.
self.publishedWritability = false
return false
Expand Down Expand Up @@ -364,7 +365,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
let (promise, result) = self.state.didWrite(itemCount: itemCount, result: result)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
channelWritabilityFlag.store(true, ordering: .relaxed)
}

promise?.succeed(())
Expand Down Expand Up @@ -459,7 +460,7 @@ internal protocol PendingWritesManager: AnyObject {
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
var channelWritabilityFlag: NIOAtomic<Bool> { get }
var channelWritabilityFlag: ManagedAtomic<Bool> { get }

/// Represents the writability state the last time we published a writability change to the `Channel`.
/// This is used in `triggerWriteOperations` to determine whether we need to trigger a writability
Expand All @@ -470,7 +471,7 @@ internal protocol PendingWritesManager: AnyObject {
extension PendingWritesManager {
// This is called from `Channel` API so must be thread-safe.
var isWritable: Bool {
return self.channelWritabilityFlag.load()
return self.channelWritabilityFlag.load(ordering: .relaxed)
}

internal func triggerWriteOperations(triggerOneWriteOperation: (WriteMechanism) throws -> OneWriteOperationResult) throws -> OverallWriteResult {
Expand Down Expand Up @@ -514,6 +515,6 @@ extension PendingWritesManager {
extension PendingStreamWritesManager: CustomStringConvertible {
var description: String {
return "PendingStreamWritesManager { isFlushPending: \(self.isFlushPending), " +
/* */ "writabilityFlag: \(self.channelWritabilityFlag.load())), state: \(self.state) }"
/* */ "writabilityFlag: \(self.channelWritabilityFlag.load(ordering: .relaxed))), state: \(self.state) }"
}
}
7 changes: 4 additions & 3 deletions Sources/NIOPosix/SelectableEventLoop.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Dispatch
import NIOCore
import NIOConcurrencyHelpers
import _NIODataStructures
import Atomics

/// Execute the given closure and ensure we release all auto pools if needed.
@inlinable
Expand Down Expand Up @@ -73,7 +74,7 @@ internal final class SelectableEventLoop: EventLoop {
// This may only be read/written while holding the _tasksLock.
internal var _pendingTaskPop = false
@usableFromInline
internal var scheduledTaskCounter = NIOAtomic.makeAtomic(value: UInt64(0))
internal var scheduledTaskCounter = ManagedAtomic<UInt64>(0)
@usableFromInline
internal var _scheduledTasks = PriorityQueue<ScheduledTask>()

Expand Down Expand Up @@ -276,7 +277,7 @@ Further information:
@inlinable
internal func scheduleTask<T>(deadline: NIODeadline, _ task: @escaping () throws -> T) -> Scheduled<T> {
let promise: EventLoopPromise<T> = self.makePromise()
let task = ScheduledTask(id: self.scheduledTaskCounter.add(1), {
let task = ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), {
do {
promise.succeed(try task())
} catch let err {
Expand Down Expand Up @@ -317,7 +318,7 @@ Further information:
@inlinable
internal func execute(_ task: @escaping () -> Void) {
// nothing we can do if we fail enqueuing here.
try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.add(1), task, { error in
try? self._schedule0(ScheduledTask(id: self.scheduledTaskCounter.loadThenWrappingIncrement(ordering: .relaxed), task, { error in
// do nothing
}, .now()))
}
Expand Down
Loading