AsyncSequence <-> Combine - equivalence (Part 2a)

Article mostly for historical record. Will try to publish an update without the inner actor stuff (no longer needed in Xcode 13 beta 5). Having written it though it seems a shame to just delete it.

In the part 1 I created an extension on AsyncSequence to return a Combine publisher representing the sequence. In this post I'm exploring the other direction wrapping a Combine publisher in an AsyncSequence. This is actually a stopgap article covering the version before the fixes in Xcode beta 5 that prevented reliably calling closures 

Like the previous post the aim was substantially to learn about how these features work rather than to produce the simplest, fastest or best solution. If you really have a need for this type of operation you should probably be basing it on the AsyncStream which wraps up and simplifies a lot of the complexity. Marin Todorov has written his own pair of blog posts covering the AsyncStream approach.

With both 

Complicated by bug SR-14875

My solution is somewhat complicated by this bug in the interaction between actors and resuming continuations from their context. The workaround is similar to the previous post where the iterator is a class instead of an actor and owns an actor containing the state (the subscriber and the continuations). This is resolved in Xcode beta 5 so this article will very shortly be superseded with a version showing the simplified version.

Implementation

This isn't necessarily described in the order I implemented this in. It was evolved with a few breaks while looking into the issues I was seeing with resuming continuations and one of the last things implemented was a change from type erasing the publisher with AnyPublisher to doing properly using generics. you can see the history if you want to see how it evolved.

The API

The primary API we want to provide is to offer the asyncSequence on any Publisher, for now I limit to ones with an error type of Error, this could reasonably be duplicated with Never, for any other error type it would need either mapping into the non element type (some sort of Result) or mapping to an Error.

extension Publisher where Self.Failure == Error {

    public var asyncSequence: PublisherAsyncSequence<Self> {

        PublisherAsyncSequence(publisher: self)

    }

}

The PublisherAsyncSequence itself is the next thing to define. I'll define the necessary extensions for protocol conformances afterwards.

public struct PublisherAsyncSequence<P> where P : Publisher, P.Failure == Error {

    init(publisher: P) {

        self.pub = publisher

    }

    let pub: P

    public typealias Element = P.Output

    public typealias Failure = P.Failure

}

Still on the public API we now need to implement the AsyncSequence protocol, which is fairly simple but leaves us with the loose end of actually implementing the real work within the iterator (which we will be making a nested class on the PublisherAsyncSequence although there are other options.

extension PublisherAsyncSequence : AsyncSequence {

    public func makeAsyncIterator() -> Iterator {

        let itr = Iterator()

        pub.receive(subscriber: itr)

        return itr

    }

}

The iterator will need to conform to AsyncIterator protocol and this itself is actually fairly simple (again the bulk of the logic will be described below to do the actual work). The next() call is async so can await the real work to be done which is delegated to actor within the iterator (iActor is my variable name for innerActor)

extension PublisherAsyncSequence.Iterator : AsyncIteratorProtocol {

    public typealias Element = P.Output

    public func next() async throws -> Element? {

        try await iActor.next()

    }

}

The iterator needs to conform to Subscriber conformance so that it can pull data out of the publisher. The main detail worth noting here is that we always return a demand of .none so that the publisher won't provide another value until we ask for one. All the actual operations are done elsewhere asyncronously using `Task` (which replaces the `async` that was the initial definition in the proposal and WWDC talks) because the operations actually take place in the actor context to prevent race conditions.

In the receive(subscription:) you can see that a continuation is returned by the call to set it on the actor and then immediately called. The reason is that if next() is called on the sequence before the subscription is set up that must be made to wait. Ideally the continuation would be fired from within the actor rather than returned but due to current bugs (as of Beta3, SR-14802 and dupe that I reported) there are frequently hangs if an actor resumes a continuation. 

extension PublisherAsyncSequence.Iterator : Subscriber {

    public typealias Input = P.Output

    public typealias Failure = P.Failure

    public func receive(_ input: Element) -> Subscribers.Demand {

        Task {

            await receive(input: input)

        }

        return .none

    }

    

    public func receive(subscription: Subscription) {

        Task {

            let continuation = await self.iActor.setSubscription(subscription: subscription)

            continuation?.resume()

        }

    }

    public func receive(completion: Subscribers.Completion<Failure>) {

        Task {

            await receive(compl: completion)

        }

    }

}

OK the API is done with a few simple parts of the logic that call into the bulk of the actual logic.

Iterator and InnerActor

Apologies for the switch in format but I think the gist will be easier to read for this bigger code block. This is all the private implementation of the Iterator. What I would really like would be remove the InnerIterator completely and make the Iterator and actor instead of a class but that needs to wait until the bug I mentioned is fixed.