AsyncSequence <-> Combine - equivalence (Part 2a)

Article mostly for historical record. Will try to publish an update without the inner actor stuff (no longer needed in Xcode 13 beta 5). Having written it though it seems a shame to just delete it.

In the part 1 I created an extension on AsyncSequence to return a Combine publisher representing the sequence. In this post I'm exploring the other direction wrapping a Combine publisher in an AsyncSequence. This is actually a stopgap article covering the version before the fixes in Xcode beta 5 that prevented reliably calling closures 

Like the previous post the aim was substantially to learn about how these features work rather than to produce the simplest, fastest or best solution. If you really have a need for this type of operation you should probably be basing it on the AsyncStream which wraps up and simplifies a lot of the complexity. Marin Todorov has written his own pair of blog posts covering the AsyncStream approach.

With both 

Complicated by bug SR-14875

My solution is somewhat complicated by this bug in the interaction between actors and resuming continuations from their context. The workaround is similar to the previous post where the iterator is a class instead of an actor and owns an actor containing the state (the subscriber and the continuations). This is resolved in Xcode beta 5 so this article will very shortly be superseded with a version showing the simplified version.

Implementation

This isn't necessarily described in the order I implemented this in. It was evolved with a few breaks while looking into the issues I was seeing with resuming continuations and one of the last things implemented was a change from type erasing the publisher with AnyPublisher to doing properly using generics. you can see the history if you want to see how it evolved.

The API

The primary API we want to provide is to offer the asyncSequence on any Publisher, for now I limit to ones with an error type of Error, this could reasonably be duplicated with Never, for any other error type it would need either mapping into the non element type (some sort of Result) or mapping to an Error.

extension Publisher where Self.Failure == Error {

    public var asyncSequence: PublisherAsyncSequence<Self> {

        PublisherAsyncSequence(publisher: self)

    }

}

The PublisherAsyncSequence itself is the next thing to define. I'll define the necessary extensions for protocol conformances afterwards.

public struct PublisherAsyncSequence<P> where P : Publisher, P.Failure == Error {

    init(publisher: P) {

        self.pub = publisher

    }

    let pub: P

    public typealias Element = P.Output

    public typealias Failure = P.Failure

}

Still on the public API we now need to implement the AsyncSequence protocol, which is fairly simple but leaves us with the loose end of actually implementing the real work within the iterator (which we will be making a nested class on the PublisherAsyncSequence although there are other options.

extension PublisherAsyncSequence : AsyncSequence {

    public func makeAsyncIterator() -> Iterator {

        let itr = Iterator()

        pub.receive(subscriber: itr)

        return itr

    }

}

The iterator will need to conform to AsyncIterator protocol and this itself is actually fairly simple (again the bulk of the logic will be described below to do the actual work). The next() call is async so can await the real work to be done which is delegated to actor within the iterator (iActor is my variable name for innerActor)

extension PublisherAsyncSequence.Iterator : AsyncIteratorProtocol {

    public typealias Element = P.Output

    public func next() async throws -> Element? {

        try await iActor.next()

    }

}

The iterator needs to conform to Subscriber conformance so that it can pull data out of the publisher. The main detail worth noting here is that we always return a demand of .none so that the publisher won't provide another value until we ask for one. All the actual operations are done elsewhere asyncronously using `Task` (which replaces the `async` that was the initial definition in the proposal and WWDC talks) because the operations actually take place in the actor context to prevent race conditions.

In the receive(subscription:) you can see that a continuation is returned by the call to set it on the actor and then immediately called. The reason is that if next() is called on the sequence before the subscription is set up that must be made to wait. Ideally the continuation would be fired from within the actor rather than returned but due to current bugs (as of Beta3, SR-14802 and dupe that I reported) there are frequently hangs if an actor resumes a continuation. 

extension PublisherAsyncSequence.Iterator : Subscriber {

    public typealias Input = P.Output

    public typealias Failure = P.Failure

    public func receive(_ input: Element) -> Subscribers.Demand {

        Task {

            await receive(input: input)

        }

        return .none

    }

    

    public func receive(subscription: Subscription) {

        Task {

            let continuation = await self.iActor.setSubscription(subscription: subscription)

            continuation?.resume()

        }

    }

    public func receive(completion: Subscribers.Completion<Failure>) {

        Task {

            await receive(compl: completion)

        }

    }

}

OK the API is done with a few simple parts of the logic that call into the bulk of the actual logic.

Iterator and InnerActor

Apologies for the switch in format but I think the gist will be easier to read for this bigger code block. This is all the private implementation of the Iterator. What I would really like would be remove the InnerIterator completely and make the Iterator and actor instead of a class but that needs to wait until the bug I mentioned is fixed.

AsyncSequence -> Combine - equivalence (Part 1)

One of the interesting things from WWDC this year in not just that Swift is adopting AsyncSequence but that Apple is fully behind it rather than further expanding support for Combine. This raises some interesting questions for the future of Combine, whether it is just destined to be a niche feature just supporting SwiftUI in particular ways, gets completely deprecated in future. This series (possibly just pair) of blog posts is going to compare the two by means of implementing mappers in both ways between them.

[Update: Marin Todorov has a nice post about the reverse if you want to get ahead on that.] 

if you want to have

While I haven't had the opportunity to use it in a real project yet I'm really liking the look of AsyncSequence and the whole of the structured concurrency approach.

These mappers will be implemented as raw conformances because the point is for me to learn and understand more. There may be better, more efficient or simpler approaches but I want to look into the details of how they work. I'll follow up if I find significantly better ways to do it.

Resuming continuations from actors doesn't work (Beta1-2)

One thing that has delayed this post is having hit some non-deterministic bugs I lost confident in my understanding of the Structured Concurrency and didn't want to share widely until I had established that the issues I was seeing were related to Swift/MacOS bugs rather a deadlock I was introducing by doing something substantially wrong.

Swift AsyncSequence Performance Experimentation

[UPDATE 16/8/2021: Xcode/iOS beta 5 seems to make a massive jump in performance. I'll blog again when I get round to it but wow. From first glance async appending byte by byte appears to be only 3x slower than sync call to load data and the XOR tests are actually faster now in my initial testing.

Updated conclusion - If you can drop iOS 14 support then there is no performance reason not to use AsyncSequence type operations on files and lots of positive reasons to use it.]

At WWDC 2021 amongst the other async talks there was the Meet AsyncSequence talk which amongst other things demonstrated new APIs for getting async sequences of bytes from files or URLs. I was curious as to how they would perform so I did a few tests. Note that the FileHandle and URL bytes properties do not seem to be working in Xcode 13 beta 1 but the version on URLSession does seem to work and can be used for files - see the updates on my last post 

This is a very first pass with some rough numbers to get a feel for rough ballpark feeling. This is iOS 15 beta 1 (on iPhone XS) and Xcode 13 beta 1. Testing was done with -O (speed) optimisation setting. The tests themselves will be at the end of this post. I configured the tests to run just the once because first run performance was consistently slower and I thought therefore more representative of file reading where the file is not already loaded.

As a baseline I included a pure synchronous read into a Data object. The AsyncSequence approach should not get close to the performance of this as it has to do a great deal more context switching at least function calls whereas that is straight-line on a single thread code.

Results

Am I holding it wrong? - AsyncSequence file reading experiments with iOS 15 beta 1

[Update: Compiler but in Xcode 13 beta 1 confirmed. See below update]

[Update2: New post up with some rough and ready benchmarking of AsyncSequence file reading using URLSession]

So what I wanted to know was how to efficiently and asynchronously read a file into a Data object using the new structured concurrency approach in Swift. The short version is that I didn't manage to get the async approaches to file reading working, not sure yet if I was doing it wrong or there are currently problems with the frameworks (I have filed a bug report with Apple - FB9177012). Either way the async approaches were crashing for me. And I do welcome responses letting me know what I'm doing wrong.


I couldn't see any exact APIs that looked right for asynchronously reading a file. The Meet AsyncSequence talk from WWDC gave one possibility although it didn't look very efficient of handling the URL as an AsyncSequence of bytes but I thought I would do some investigation to check. I later as you can see at the end of this post realised that URLSession provides the method I need for a one shot request (that also wasn't working). I still would like to test how the performance of all these approaches varies.

Update

Confirmation that it is a beta 1 from someone who worked on some of the AsyncSequence stuff at Apple.

End update

MicroInjection - Tiny Dependency Injection Package

This is a follow up to my previous post (How does the SwiftUI Environment work and can it be used outside SwiftUI for Dependency Injection?).

Since writing that post I've refined the MicroInjection Package I created a bit. The core library is still a single file but with a couple of new features and fairly decent documentation included the file is now about 100 lines long.

[Apologies for the lack of code snippets, they are a real fiddle on Posthaven. I might move blogging to my own server at some point but I need to find time to migrate things].

[Update 20th Feb 2021: Package 1.0.0 release.]

Additional Features since Previous Post