AsyncSequence -> Combine - equivalence (Part 1)

One of the interesting things from WWDC this year in not just that Swift is adopting AsyncSequence but that Apple is fully behind it rather than further expanding support for Combine. This raises some interesting questions for the future of Combine, whether it is just destined to be a niche feature just supporting SwiftUI in particular ways, gets completely deprecated in future. This series (possibly just pair) of blog posts is going to compare the two by means of implementing mappers in both ways between them.

[Update: Marin Todorov has a nice post about the reverse if you want to get ahead on that.] 

if you want to have

While I haven't had the opportunity to use it in a real project yet I'm really liking the look of AsyncSequence and the whole of the structured concurrency approach.

These mappers will be implemented as raw conformances because the point is for me to learn and understand more. There may be better, more efficient or simpler approaches but I want to look into the details of how they work. I'll follow up if I find significantly better ways to do it.

Resuming continuations from actors doesn't work (Beta1-2)

One thing that has delayed this post is having hit some non-deterministic bugs I lost confident in my understanding of the Structured Concurrency and didn't want to share widely until I had established that the issues I was seeing were related to Swift/MacOS bugs rather a deadlock I was introducing by doing something substantially wrong.

For now if you use an actor with continuations (which will often be sensible to ensure there aren't race conditions) then make sure that the actor returns the continuation to be resumed outside of the actor context. Otherwise there seem to be issues where the code can just deadlock and hang never to return from an await. There are bugs in the Swift bug tracker: (SR-14875SR-14841).

Bugs are clearly to be expected at early beta stages and I'm sure this will be resolved before release but they can still be confusing and frustrating especially before you become certain that they are a bug in the framework rather than either your own bug or a conceptual misunderstanding.

AsyncSequence.publisher

This first step in showing an equivalence was to develop a Combine Publisher that wraps an AsyncSequence. I'm reasonable confident that I have one that broadly works although it could definitely do with more testing and I don't know that it will have efficient performance. This was principally a learning exercise not one of needing code for production

The Repo

Code including a trivial test is available here which is in Swift Package form but that doesn't mean you should just import it into your project at this point. It is a place for my experimentation rather than published for widespread use currently.

Apart from the tests the only current feature is Time AsyncSequence because I needed a sequence to test with and I was hitting the issues in URL and Filehandle sequences (known beta 1 issues but at the time I didn't know the workaround to use the URLSession APIs to read files as AsyncSequences).

My tentative plan is to next implement the reverse mapping and then add some of the types of operators that already exist on Combine for AsyncSequences.

Implementation


The Code

Lets start by looking at the full code for the the AsyncSequencePublisher and then I'll pick out a couple of particular pieces to explain afterwards.

ASPSubscription

As implemented the core of the behaviour lives not inside the AsyncSequencePublisher itself but the ASPSubscription which is what manages a single subscription to the publisher and tracks the progress though the async sequence. In the mapping the subscription relates to a particular iterator from the AsyncSequence side.

The first thing of interest in the subscription is the type. As implemented it is an actor because there needs to be coordination of access to the demand for more items from the subscriber with the consumption of that demand as values are retrieved. It also reveals a limitation of the mapping to Publisher which is that the Failure type has to be Error as far as I can see. I don't think an AsyncSequence can be guaranteed to be non throwing and because they throw the error type is always Error (this also translates out to the type of AsyncSequencePublisher).

fileprivate actor ASPSubscription<S> : Subscription

    where S : Subscriber, S.Failure == Error, S.Input == AsyncSequenceType.Element {

mainLoop

The real work of the iteration is done in the mainLoop function or actually it starts the async task loop and stores the taskHandle to the ASPSubscription so that it can be cancelled by the Subscription required cancel method. It runs the bulk of the operation inside a `withTaskCancellationHandler` call so that it can ensure it it not blocked waiting for demand that won't come or holding onto resources.

Focus mainly on the actyak loop over the sequence has a couple of complications but isn't too bad.

  1. The first thing that it does is await the first element in the sequence.
  2. Then await on the call to waitUntilReadyForMore (details later but it is responsible for checking the current demand from the subscriber).
  3. Then it checks for cancellation of the task before proceeding to actually deliver. Note that it doesn't throw but returns on cancellation because it doesn't want to send anything else to the subscriber after cancellation.
  4. Delivers element to subscriber (with subscriber.receive(element)) and uses the return value to update the expected demand. The reason that this is another await is because it needs to set the value on the actor (there is possibly an optimisation to only await and update setDemand when the response will actually add demand).
  5. If we reach the end of the loop it will call subscriber.receive(completion: .finished)
  6. If at any point an error is thrown we call subscriber.receive(completion: .failure(error) - It might be that this should be checking for Task.CancellationErrors and not throwing in these cases. 

Demand management

This part probably caused me the most trouble because I wasn't familiar with how demand works for Combine publishers (and then I hit hanging bugs resuming continuations from actors). The key thing I didn't know is that when demand is updated the demand is additive to the existing demand rather than replacing it.

waitUntilReadyForMore is the method to wait on to ensure that there is demand to send the next item. It is on the actor so needs to be async to access the properties anyway but it must also await an update in demand indefinitely until more items are requested if no more are currently required. It does this by awaiting a continuation closure which can be fired whenever demand is added.

 

The addDemand function is the async partner to the non isolated request function fulfilling the Subscription protocol and it is also used in the mainLoop to deal with the returned demand when an element is sent to the subscriber. It runs on the actor and updates the demand and then if it is positive it returns the continuation so that it can be resumed (should be possible to resume it itself but it currently hangs when done from the actor context.

The Rest

Most of the rest of the type is around fulfilling the protocols required and I think is fairly clear if you have an understanding of the above parts. Check here for the full file

Testing

Coverage is currently light with a run through to completion of a limited sequence (using a timer based sequence) and another test for cancellation. Definitely could do with adding some more at some point. You can find the test code in the repo along with the timer AsyncSequence