目次

概要

Ver. 8.0

C# 8.0 で、非同期メソッドが大幅に拡張されます。

一連のデータ(data stream)を、非同期に生成(イテレーター)して非同期に消費(foreach)する機能なので、これらを合わせて非同期ストリーム(async stream)と呼ばれます。

同期的な処理であれば、これまでもイテレーターforeachという機能がありました。 非同期ストリームはこれらの非同期版(非同期メソッドとの混在)になります。

IAsyncEnumerable

イテレーターforeachでは、IEnumerable<T>インターフェイス(System.Collections.Generic名前空間)が中心的な役割を担います。

  • イテレーターの戻り値はIEnumerable<T>もしくはIEnumerator<T>である必要がある
  • foreachパターン ベースで、 「IEnumerable<T>と同じメソッドを持つ」というのが満たすべきパターン

C# 8.0 ではこれらの非同期版が入るわけですが、 同期版と同じく中心的な役割を担うインターフェイスがあり、 それがIAsyncEnumerable<T>インターフェイス(System.Collections.Generic名前空間)です。 以下のような構造になっています。

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
 
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}
 
public interface IAsyncDisposable
{
    ValueTask DisposeAsync();
}

インターフェイス名とメソッド名にAsyncが付いたのと、一部のメソッドの戻り値がValueTask<T>になっているくらいで、ほとんど同期版と同じ構造です。

同期版と同じく、非同期ストリームと以下のような関わりがあります。

  • 非同期イテレーターの戻り値はIAsyncEnumerable<T>もしくはIAsyncEnumerator<T>である必要がある
  • 非同期foreachパターン ベースで、 「IAsyncEnumerable<T>と同じメソッドを持つ」というのが満たすべきパターン

ちなみに、同期版とは違って、非ジェネリックなIAsyncEnumerableIAsyncEnumeratorインターフェイスはありません。 (非ジェネリックなIEnumerableジェネリクス導入前の名残で、互換性のためだけに残されているものです。)

非同期foreach

仕組みが単純なので、データの消費側(非同期foreach)の方を先に説明します。 以下のように、await foreach と書くことで、 IAsyncEnumerable<T> (と同じパターンを持つ型)の列挙ができます。

static async Task AsyncForeach(IAsyncEnumerable<int> items)
{
    await foreach (var item in items)
    {
        Console.WriteLine(item);
    }
}

await演算子と同じく、 非同期メソッド(async 修飾が付いたメソッド)内でだけ使えます。

このコードは、同期版のforeachと似たような感じで、以下のように展開されます。 同期版と比べて、MoveNextDisposeが非同期になっただけです。

static async Task AsyncForeach(IAsyncEnumerable<int> items)
{
    var e = items.GetAsyncEnumerator();
    try
    {
        while (await e.MoveNextAsync())
        {
            int item = e.Current;
            Console.WriteLine(item);
        }
    }
    finally
    {
        if (e != null)
        {
            await e.DisposeAsync();
        }
    }
}

同期版と同じく、finally内の処理にはいくつかバリエーションがあります。

  • enumerator (上記の例で言う e)が構造体なら null チェックは挟まらない
  • DisposeAsync を持っていない場合はfinally内の処理自体消える

パターン ベース

パターン ベースなので、インターフェイスを実装していなくても、 所定のメソッドさえ持っていれば非同期foreachで使えます。 以下はその一例です。

using System;
using System.Threading.Tasks;
 
struct A
{
    // このメソッドが「Enumerable」の必須要件。
    // この例では自分自身を返している(それでもOK)ものの、通常は別の型を作って返す。
    public A GetAsyncEnumerator() => this;
 
    // 以下の2つが「Enumerator」の必須要件。
    public int Current => 0;
    public ValueTask<bool> MoveNextAsync()
    {
        Console.WriteLine("MoveNextAsync");
        return new ValueTask<bool>(false);
    }
 
    // DisposeAsync はなくてもいい。なければ呼ばれないだけ。
    public ValueTask DisposeAsync()
    {
        Console.WriteLine("DisposeAsync");
        return default;
    }
 
    // 同期の Dispose は定義してあっても呼ばれないので注意。
}
 
public class Program
{
    public static async Task Main()
    {
        await foreach (var x in new A()) ;
    }
}

この例ではValueTask型を使っていますが、これすらもパターン ベースで大丈夫です。 要は、await可能であれば型は問いません。

また、後から追加された構文だけあって、同期版のforeachよりもパターンの条件が緩いです。以下のように、オプション引数や可変長引数が付いていても平気です(同期版はダメ)。

using System.Threading;
using System.Threading.Tasks;
 
struct A
{
    // 可変長引数があってもいい
    public A GetAsyncEnumerator(params int[] dummy) => this;
 
    public int Current => 0;
 
    // オプション引数があってもいい。
    public ValueTask<bool> MoveNextAsync(CancellationToken token = default) => default;
}
 
public class Program
{
    public static async Task Main()
    {
        await foreach (var x in new A()) ;
    }
}

非同期using

前節の非同期foreachの展開結果にはDisposeAsyncの呼び出しが含まれていました。 また、IAsyncEnumerator<T>IAsyncDisposableから派生しています。

これは同期版の頃(foreachの展開結果)からある仕様で、同期版にもDisposeの呼び出しが含まれています。 この処理は、usingステートメントがやっていることと同じです。 すなわち、foreachusingを兼ねています。

ということで、 非同期foreachが追加するのであれば、 同時に非同期usingも追加するのが妥当です。 そこで実際、C# 8.0で非同期usingが追加されています。

非同期foreachと同様await usingという書き方をします。

static async Task AsyncUsing(IAsyncDisposable d)
{
    await using (d)
    {
        // d を破棄する前にやっておきたい処理
    }
}

これも非同期foreachと同様に、非同期メソッド(async 修飾が付いたメソッド)内でだけ使えます。

展開結果は、同期版でDispose()呼び出しだった部分がawait DisposeAsync()に変わっているだけです。 上記のコードは以下のように展開されます。

static async Task AsyncUsing(IAsyncDisposable d)
{
    try
    {
        // d を破棄する前にやっておきたい処理
    }
    finally
    {
        await d.DisposeAsync();
    }
}

パターン ベース

パターン ベースな構文」で説明していますが、同期版のusingは数少ない「インターフェイス実装が必須な構文」です。

一方、非同期usingはパターン ベースになっています。 以下のように、IAsyncDisposableインターフェイスを実装せず、 単にDisposeAsyncメソッドを持っていればawait usingで使えます。

using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
 
// 非同期 using は別に IAsyncDisposable インターフェイスの実装を求めない。
class AsyncDisposable
{
    // ちゃんと await using のブロックの最後で呼ばれる。
    // 戻り値の型が Task や ValueTask である必要もない。
    public MyAwaitable DisposeAsync()
    {
        Console.WriteLine("disposed async");
        return default;
    }
}
 
struct MyAwaitable { public ValueTaskAwaiter GetAwaiter() => default; }
 
class Program
{
    static async Task Main()
    {
        await using (new AsyncDisposable())
        {
            Console.WriteLine("inside using");
        }
    }
}

見ての通り、DisposeAsyncの戻り値はawait可能でさえあれば何でも構いません。

また、オプション引数や可変長引数があっても構いません。

using System.Threading.Tasks;
 
struct A
{
    public ValueTask DisposeAsync(int dummy = 0) => default;
}
 
struct B
{
    public ValueTask DisposeAsync(params int[] dummy) => default;
}
 
public class Program
{
    public static async Task Main()
    {
        await using (new A()) { }
        await using (new B()) { }
    }
}

制限と言えば、インスタンス メソッドしか受け付けない(拡張メソッドは使えない)くらいです。

一方で、同期版と違って、as演算子を使った動的な型チェックはしません。 以下のように、直接的にはIAsyncDisposableインターフェイスを実装していなくて、 パターンも満たさない型に対してawait usingを使うとコンパイル エラーになります。

using System;
using System.Threading.Tasks;
 
class A { }
 
class B : A, IAsyncDisposable
{
    public ValueTask DisposeAsync() => default;
}
 
public class Program
{
    public static async Task Main()
    {
        // A は IAsyncDisposable じゃないけど、
        // 派生クラスの B は IAsyncDisposable を実装。
        await AsyncUsing(new B());
    }
 
    static async Task AsyncUsing(A a)
    {
        // これはコンパイル エラーになる。
        // A が直接 IAsyncDisposable を実装しているか、パターンを満たしている必要がある。
        await using (a) { }
    }
}

ジェネリック型引数に対して使う場合にも、IAsyncDisposable制約が必要になります。

static async Task M<T>(T x)
    where T : IAsyncDisposable // この制約がないと await using の行でコンパイル エラーに。
{
    await using (x) { }
}

using変数宣言との併用

using変数宣言との併用も可能です。 以下のような書き方ができます。

using System.Threading.Tasks;
 
struct AsyncDisposable
{
    public ValueTask DisposeAsync() => default;
}
 
public class Program
{
    public static async Task Main()
    {
        await using var x = new AsyncDisposable();
 
        // このメソッドを抜けるタイミングで DisposeAsync が呼ばれる
    }
}

DisposeとDisposeAsyncの混在

ちなみに、Dispose(IDisposableインターフェイス)とDisposeAsync(IAsyncDisposableインターフェイス)の両方を実装をしている場合、それぞれ同期版using、非同期usingでしか呼ばれません。 非同期版が同期版を兼ねたりはしませんし、その逆もまたしかり。

以下の例では、usingの行ではDisposeだけが呼ばれますし、 await usingの行ではDisposeAsyncだけが呼ばれます。

using System;
using System.Threading.Tasks;
 
struct Disposable : IDisposable, IAsyncDisposable
{
    public void Dispose() => Console.WriteLine("同期 Dispose");
    public ValueTask DisposeAsync()
    {
        Console.WriteLine("非同期 Dispose");
        return default;
    }
}
 
public class Program
{
    public static async Task Main()
    {
        var d = new Disposable();
 
        // Dispose だけが呼ばれる
        using (d) { }
 
        // DisposeAsync だけが呼ばれる
        await using (d) { }
    }
}

非同期イテレーター

非同期foreachとは逆の、データの生成側の機能が非同期イテレーターです。 簡単に言うと、yieldawaitの混在ができるようになりました。

例えば以下のような書き方で、1秒に1回、整数値を生成するイテレーターになります。

static async IAsyncEnumerable<int> GenerateAsync()
{
    for (int i = 0; ; i++)
    {
        yield return i;
        await Task.Delay(TimeSpan.FromSeconds(1));
    }
}

同期版のイテレーター(yield)は以下のような条件を満たすものでした。

  • 関数(メソッドなど)の本体の中にyield returnもしくはyield breakを含む
  • 戻り値の型は IEnumerableIEnumerator(System.Collection名前空間)、もしくは、IEnumerable<T>IEnumerator<T>(System.Collection.Generic名前空間)のいずれか

また、非同期メソッドは以下のようなものです。

  • メソッドにasync修飾子が付いている
    • この場合に限り、メソッド内にawait演算子を書ける

非同期イテレーターはこれらの組み合わせなので、以下のようなものになります。

  • 関数(メソッドなど)の本体の中にyield returnもしくはyield breakを含む
  • 戻り値の型は IAsyncEnumerable<T>IAsyncEnumerator<T>(System.Collection.Generic名前空間)のいずれか
  • メソッドにasync修飾子が付いている
    • メソッド内にawait演算子を書ける

非同期イテレーターのコンパイル結果

非同期イテレーターの仕組みは、同期版のイテレーターや非同期メソッドの延長線上にあります。 それぞれについては以下のページで説明しています。

これら2つは原理的には非常に似ています。 というより、非同期メソッド自体、イテレーターから着想を得て作られた機能です。 なので、パフォーマンス チューニングなど細かい点を除けば、組み合わせることはそれほど難しくはありません。 (ただ、いずれも元々相当複雑なコード生成になるので、 組み合わせた上でパフォーマンスにも配慮すると結構難解なコード生成になります。)

原理だけ簡単に説明すると、非同期イテレーター中にyield return xと書くと、 概ね以下のようなコードが生成されます。

_state = State1;             // 次に復帰するときのための状態の記録
Current = x;                 // 戻り値を Current に保持
_taskSource.SetResult(true); // MoveNextAsync の戻り値で返した Task を完了させる
return;                      // 一旦処理終了
case: State1:                // 時宜に呼ばれたときに続きから処理するためのラベル

(同期版での説明と同様、疑似コードです。実際の C# では case に変数は使えないので、 「これに相当する goto が生成される」くらいのものだと思って読んでください。)

_taskSourceは、現状の実装ではManualResetValueTaskSourceCoreという型を使っています。 既存の型で言うとTaskCompletionSource<T>と似た型というか、用途的には完全に同じで、 パフォーマンス最適化のために導入された構造体です。 (パフォーマンスはいいですが、使い勝手は少し煩雑になります。)

余談: 文脈キーワード

C# は後方互換性を非常に重要視する言語なので、 yieldawait文脈キーワードになっています。 例えば、以下のようなコードではyieldawaitがキーワード扱いされず、 普通に変数として使えています。

static void M()
{
    var yield = 2;
    var await = 3;
    Console.WriteLine(yield * await);
}

ただ、この2つは文脈の作り方が異なります。

  • yieldは、yield returnもしくはyield breakというように、2単語が並んだ場合だけキーワード扱いされる
    • このキーワードを含んだ時点でイテレーター扱いされる
  • awaitは、メソッド自体にasync修飾子が付いているときだけキーワード扱いされる
    • async修飾子は「awaitがキーワードになるかどうか」の目印的な意味しかない

非同期イテレーターの導入にあたって、 方式が異なる2つのものを混ぜることに対する懸念もありました。 yieldの「含んだ時点でイテレーターになる」というのはコンパイラーにとって結構負担があるらしく、匿名関数をイテレーターに出来ないという問題があったりもします。 そのため、「イテレーターにもiterator修飾子みたいなものを足そうか」という話が出たこともあります。 しかし、「同じ用途の別文法」を作ってしまう混乱を起こしてまで実現する課題ではないという判断になり、結局2方式の混在が採用されました。

キャンセル

IAsyncEnumerable<T>インターフェイスのGetAsyncEnumeratorメソッドにはCancellationTokenを渡せるようになっていて、これを使って非同期処理の途中キャンセルをする想定になっています。

非同期イテレーターでは、以下のように、引数にEnumeratorCancellation属性(System.Runtime.CompilerServices名前空間)を付けることでこのCancellationTokenを受け取れるようになります。

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
 
public class Program
{
    static async Task Main()
    {
        var cts = new CancellationTokenSource();
 
        var enumerable = GenerateAsync();
 
        // ここで引数に渡したトークンが、GenerateAsync の ct 引数にわたる。
        var enumerator = enumerable.GetAsyncEnumerator(cts.Token);
 
        // キャンセル前なので値が取れるはず。
        await enumerator.MoveNextAsync();
        Console.WriteLine(enumerator.Current);
 
        cts.Cancel();
 
        // キャンセルしたので止まるはず。
        if (!await enumerator.MoveNextAsync())
            Console.WriteLine("終了");
    }
 
    // キャンセルが掛かるまでずっと、1秒に1個値を生成。
    static async IAsyncEnumerable<int> GenerateAsync([EnumeratorCancellation] CancellationToken ct = default)
    {
        var i = 0;
        while (!ct.IsCancellationRequested)
        {
            yield return i;
            await Task.Delay(TimeSpan.FromSeconds(1));
            ++i;
        }
    }
}

ちなみに、非同期foreachで使いたい場合、WithCancellation拡張メソッドが使えます。 WithCancellation の引数で渡したCancellationTokenGetAsyncEnumeratorに伝搬し、 最終的にGenerateAsyncct引数に渡ります。

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
 
public class Program
{
    static async Task Main()
    {
        // 5秒後にキャンセルが掛かる。
        var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
 
        // WithCancellation に渡したトークンが GenerateAsync まで伝搬する。
        await foreach (var i in GenerateAsync().WithCancellation(cts.Token))
        {
            Console.WriteLine(i);
        }
    }
 
    // キャンセルが掛かるまでずっと、1秒に1個値を生成。
    static async IAsyncEnumerable<int> GenerateAsync([EnumeratorCancellation] CancellationToken ct = default)
    {
        var i = 0;
        while (!ct.IsCancellationRequested)
        {
            yield return i;
            await Task.Delay(TimeSpan.FromSeconds(1));
            ++i;
        }
    }
}

引数越しに受け取る仕様なので、 以下のように、呼び出し側で引数に直接渡すのと、WithCancellation越しに渡すので、 2重にCancellationTokenを渡せます。 この場合、2個のうちどちらか片方でもCancelが掛かった時点でキャンセル扱いになります。 (正確に言うと、CreateLinkedTokenSource を使って新たに作ったCancellationTokenが渡ります。)

// CancellationToken を2個用意。
var ct1 = new CancellationTokenSource(TimeSpan.FromSeconds(3)).Token;
var ct2 = new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token;
 
// 引数に直接渡せるし、WithCancellation でも渡せる。
// この場合、どちらか片方でも Cancel された時点でキャンセル扱い。
// (GenerateAsync には CreateLinkedTokenSource(ct1, ct2) した新しいトークンが渡る。)
await foreach (var i in GenerateAsync(ct1).WithCancellation(ct2))
{
    Console.WriteLine(i);
}

更新履歴

ブログ