AsyncStream is What Async/Await Should've Been
Apple did something right with AsyncStream. But did they go far enough?
Yes, you read it right. I’ve been researching Swift Concurrency for my book and I think AsyncStream is what async/await should’ve been because it most closely resembles RxSwift Observables. Let me explain.
The main way you use and create AsyncStreams is via the Continuation API:
func generateNumbersWithContinuation() -> AsyncStream<Int> {
AsyncStream { continuation in
continuation.onTermination = { termination in
// do the cleanup here
}
Task {
for i in 1...5 {
try await Task.sleep(nanoseconds: 500_000_000)
continuation.yield(i)
}
continuation.finish()
}
}
}
// run it with
Task {
for await number in generateNumbersWithContinuation() {
print(number)
}
}
Here, we create an AsyncStream that emits new values over time asynchronously and calls finish when it’s done. We also set up the onTermination callback block to do the cleanup of any underlying resources that were producing the values asynchronously to be emitted from the stream.
Conceptually, pretty much literally, this is an RxSwift Observable:
private let disposeBag = DisposeBag()
func generateNumbersWithObservable() -> Observable<Int> {
return Observable.create { observer in
for i in 1...5 {
Thread.sleep(forTimeInterval: 0.5)
observer.onNext(i)
}
observer.onCompleted()
return Disposables.create {
// do the cleanup here
}
}
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
}
// run it with
generateNumbersWithObservable().subscribe(onNext: { number in
print(number)
}).disposed(by: disposeBag)
Here, we do literally the same thing, we have an Observable that emits values over time asynchronously and calls onCompleted when it’s done. We set up a cleanup block with Disposables.create to do the cleanup of any underlying resources that were producing the values asynchronously to be emitted from the observable.
Congratulations Apple, you reinvented the wheel! Ghmm… but I digress.
This is not unexpected since the concept of reactive streams is very good, so it’s natural for Apple to steal/get inspired by it.
…but where Apple didn’t go far enough is to make async/await functions mimic, and be similarly implemented to, RxSwift Singles.
Conceptually, async operations, regardless of what language and framework you use to implement them, are blocks of code/work, jobs that need to be executed asynchronously, that have the main block of work/setup plus a cleanup part to wind down if the operation was cancelled or finished.
When we look at the AsyncStream and Observable examples above (albeit, very simplified), we see that in action - there is a main chunk of code for the main action and then there is a cleanup block to wind down the operations.
The difference from async/await is that it assumes that there will be multiple results produced. With async/await, typically, there is one result produced at the end of the operation (or no result but you can mimic it with Void with both AsyncStream and Observable).
func generateANumbersWithAsyncAwait() async -> Int {
try? await Task.sleep(nanoseconds: 500_000_000)
return Int.random(in: 1...5)
}
// run it with
Task {
let result = await generateANumbersWithAsyncAwait()
print(result)
}
Above is an example of an async/await function. It has the main block of code to perform the work, but where is the cleanup to wind down the operation? It’s not there. The only thing you can do is to check on a procedural flag with Task.isCancelled or Task.checkCancellation(). The problem with it is that most of the time developers will simply forget to do it.
Now, if only Apple would implement async/await functions similarly to AsyncStream so that they have an onTermination callback to implement the cleanup…. Oh wait, if they did that they would essentially reimplement/reinvent the Single from RxSwift!!!
Singles in RxSwift are Observables made for one purpose, to produce one value asynchronously. Since they are observables in nature they have the same traits such as Disposables.create callback block to properly cleanup after the operation is done or cancelled:
private let disposeBag = DisposeBag()
func generateNumbersWithSingle() -> Single<Int> {
return Single.create { observer in
Thread.sleep(forTimeInterval: 0.5)
observer(.success(Int.random(in: 1...5)))
return Disposables.create {
// do the cleanup here
}
}
.subscribe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated))
}
// run with
generateNumbersWithSingle().subscribe(onSuccess: { number in
print(number)
}).disposed(by: disposeBag)
Here, just like with Observable or AsyncStream, you have an explicit dedicated block for cleanup of the asynchronous operation.
If only async/await was like that! (sigh…) but unfortunately it remains the “AsyncStream that never was”... Apple should’ve just stole/reimplemented the Rx Single…
I had never looked at it this way, it's very interesting.
However, I think they carefully avoided overlap with Combine, which is their implementation of Rx in the end.