EnumerableEx.Createでusingを使いたい話

yield return などのような形で IEnumerable<T> を生成する Interactive Extensions のメソッド EnumerableEx.Create ですが、素直に使うとusingやfinallyなどの後始末が行われないケースが出てきます。

Interactive Extensions(Ix)のEnumerableEx.Createを使う時の注意点と対策 - PITABlog

上の記事では破棄すべきリソースなどをIYielder相当のクラスに登録することで問題を解決しています。

いや、でも、using使いたいじゃん。標準の yield return でも await でも扱えるんだし。

そうです、async/await自体は例外が飛ぼうと後始末を行える能力を持っているはずです。

ケース1. yield breakしたい

ちょうど IYielder.Break() というメソッドがあります。が、これを使うと、それ以降継続が呼び出されないので、当然後始末も行われません。

上のメソッドのことは忘れてreturnしましょう。

EnumerableEx.Create<int>(async y =>
{
    using (var hoge = new Resource())
    {
        await y.Return(100);
        return;//await y.Break();
        await y.Return(200);
    }
}).ForEach(Console.WriteLine);

IYield.Return() が呼ばれないままに continuation から戻った場合は列挙が終了したものと扱われるので、 IYield.Break() を呼ばずにジェネレータ関数を抜けることに何ら問題はありません。こうすれば通常のasyncメソッド同様に後始末も実行されます。

ケース2. 列挙を途中でやめたい

これはそもそも IEnumerable<T> だけでは実現できない話ですが、foreachに限って言えば IDisposable.Dispose() が呼ばれます。これを利用することで、列挙を中止した場合でも後始末を行うことができます。

非同期メソッドはTaskとして扱えますから、Taskのキャンセル処理と同じように、CancellationTokenを使うことができます。ただ非同期メソッドの戻り値の型がvoidだとOperationCancelledExceptionの処理もやってくれないので、Task型に変更します。

というかvoidな非同期メソッドはawaitできないので、外側で例外を捕まえること自体できない仕様です。元の実装でもそうなっていたので、後述するサンプルコードではMoveNextのタイミングでthrowしています。

非同期メソッドでCancellationTokenを使うと多分普通はこういう形になるわけですが・・・

async Task FooAsync(CancellationToken ct)
{
    await hoge.GetFooAsync(ct);
    await hoge.GetBarAsync(ct);
}

今の設計だとジェネレータ関数を呼び出すクラスが IYield.Return() も処理していますから、IYield.Return() にはCancellationTokenを渡さないことにします。またCancellationTokenはジェネレータ関数内で他の非同期メソッドを呼ぶときにも使えますが、foreachだけのことを考えると MoveNext() の途中でDisposeされることはないので*1、隠ぺいしてしまいます。

そんな感じでこうなりました。

interface IAwaitable : ICriticalNotifyCompletion
{
    IAwaitable GetAwaiter();
    bool IsCompleted { get; }
    void GetResult();
}

static class Yielder
{
    public static IEnumerable<T> Create<T>(Func<Func<T, IAwaitable>, Task> create)
    {
        foreach (var x in new Yielder<T>(create))
            yield return x;
    }
}

class Yielder<T>: IAwaitable, IDisposable
{
    private Action _continuation;
    private CancellationTokenSource _cts;
    private Task _task;

    public Yielder(Func<Func<T, IAwaitable>, Task> create)
    {
        _cts = new CancellationTokenSource();
        _continuation = () =>
            _task = create(v =>
            {
                this.Current = v;
                return this;
            });
    }

    public IAwaitable GetAwaiter() => this;
    public bool IsCompleted => false;
    [SecurityCritical]
    public void UnsafeOnCompleted(Action continuation) { _continuation = continuation; }
    public void OnCompleted(Action continuation) { _continuation = continuation; }
    public void GetResult() { _cts.Token.ThrowIfCancellationRequested(); }

    public Yielder<T> GetEnumerator() => this;
    public T Current { get; private set; }
    public void Reset() { throw new NotSupportedException(); }
    public bool MoveNext()
    {
        _continuation();
        // Task内で発生した例外をMoveNextのコンテキストで投げる
        if(_task.IsFaulted) throw _task.Exception.InnerException;
        return !_task.IsCompleted;
    }

    public void Dispose()
    {
        if (!_task.IsCompleted)
        {
            _cts.Cancel();
            // 初回のMoveNextの前に呼ばれることは考慮してない
            _continuation();
        }
        _task.Dispose();
    }
}

で、例えばこう。

Yielder.Create<int>(async yield =>
{
    using (var hoge = new Resource())
    {
        await yield(100);
        await yield(200);
        await yield(300);
    }
}).Take(2).ForEach(Console.WriteLine);

voidではなくTaskで受け取る最大の理由はOperationCanceledExceptionを処理してくれるからですが、終了判定を task.IsCompleted に頼ることでフラグ管理も省略できました。

ちなみにこのCancellationTokenSourceの使い方だと、boolかなにかでフラグを持っておいて、自分でOperationCanceledExceptionを投げても大丈夫だと思います。

あとがき

ところでまあ無理もないと言えば無理もないんですが、型パラメータ推論できないですね。これができると匿名型とか云々。

*1:外部からCancellationTokenを渡しておいて、非同期に列挙を中止する、といった場合には使える。