.NET Frameworkの非同期処理のメモ

いつも、微妙に詳細がわからなくてググってばかりいるのでメモ。

参考文献

@ITのC# 5.0 & VB 11.0新機能「async/await非同期メソッド」入門

MSDNの非同期プログラミングのパターン、特に他の非同期パターンと型との相互運用

MSDNのTPLと他の非同期パターンの使用

xin9le.netの非同期メソッド入門

Nine Worksのasync awaitとIProgressを使ってみる

MSDNマガジンの非同期プログラミングのベスト プラクティス

APM (Asynchronous Programming Model)

IAsyncResult BeginXXX(), EndXXX(IAsyncResult)メソッドのペアを使う。

var method = new Func<double>(
() =>
{
Thread.Sleep(3000);
return Math.PI;
});
var ar0 = method.BeginInvoke(null, null);
var result0 = method.EndInvoke(ar0); // 終わるまで待つ
Console.WriteLine(result0);
Console.ReadLine();

または

var ar1 = method.BeginInvoke(
ar2 =>
{ // 終了後呼ばれる
var result1 = method.EndInvoke(ar2);
Console.WriteLine(result1);
}, null);
ar1.AsyncWaitHandle.WaitOne(); // 終わるまで待つ
Console.ReadLine();

EAP (Event-based Asynchronous Pattern)

XXXAsync() メソッド, XXXCompletedイベントのペアを使う。

var worker = new BackgroundWorker();
worker.DoWork
+= (sender, eventArgs) =>
{
Thread.Sleep(3000);
eventArgs.Result = Math.PI;
};
worker.RunWorkerCompleted
+= (sender, eventArgs) =>
{ // 終了後呼ばれる
var result2 = (double)eventArgs.Result;
Console.WriteLine(result2);
};
worker.RunWorkerAsync();
// どうやって待つ?
Console.ReadLine();

自前でXXXAsync()メソッドを作る際は、cancelを実装しましょう。

Task<返値型> XXXAsync( arg1, arg2,…, CancellationToken cancellationToken)
{
return Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
return retval;
}, cancellationToken);
}

CancellationToken は引数で渡してもいいし、クラスで作っておいて、クラスのメソッドで、CancellationToken.Cancel()してもよし。

System.IProgress<T>をサポートして、IProgress<T>.Report()すると進捗状況が伝えられる。System.Progress<T>はコンストラクタでActionを指定しておくと、Report()したときに、受けられるので、そのActionでGUIを更新するべし。

TAP (Task-based Asynchronous Pattern)

TPL (Task Parallel Library)に含まれる、主に、System.Threading.Tasks.Taskクラスを使う。

var task0 = Task<double>.Factory.StartNew(
() =>
{
Thread.Sleep(3000);
return Math.PI;
});
task0.Wait();
var result3 = task0.Result;
Console.WriteLine(result3);
Console.ReadLine();

ContinueWith()メソッドで書くなら。。

var task1 = Task<double>.Factory.StartNew(
() =>
{
Thread.Sleep(3000);
return Math.PI;
}).ContinueWith(
task2 =>
{ // 次に呼ばれる
var result4 = task2.Result;
Console.WriteLine(result4);
});
task1.Wait();
Console.ReadLine();

async/awaitでも逐次処理。。

var result5 = await Task<double>.Factory.StartNew(
() =>
{
Thread.Sleep(3000);
return Math.PI;
});
Console.WriteLine(result5);
Console.ReadLine();

APM => TAP

Streamクラスを例に。。

public int Read( byte [] buffer, int offset, int count);
public IAsyncResult BeginRead( byte [] buffer, int offset, int count, AsyncCallback callback, object state);
public int EndRead(IAsyncResult asyncResult);

TAPでは以下のように書く。

public static Task<int> ReadAsync( this Stream stream, byte [] buffer, int offset, int count)
{
if (stream == null) throw new ArgumentNullException(“stream”);
return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, buffer, offset, count, null);
}

return Task<返値型>.Factory.FromAsync(BeginXXX, EndXXX, arg1, arg2, …, null);が基本。

あれ?キャンセル処理は無いの?

中身はこんな感じになるらしい。

public static Task<int> ReadAsync(this Stream stream, byte [] buffer, int offset, int count)
{
if (stream == null) throw new ArgumentNullException(“stream”);
var tcs = new TaskCompletionSource<int>();
stream.BeginRead(buffer, offset, count, iar =>
{
try { tcs.TrySetResult(stream.EndRead(iar)); }
catch(OperationCanceledException) { tcs.TrySetCanceled(); }
catch(Exception exc) { tcs.TrySetException(exc); }
}, null);
return tcs.Task;
}

TAP => APM

以下の関数を定義しておいて、、、

public static IAsyncResult AsApm<T>(this Task<T> task, AsyncCallback callback, object state)
{
if (task == null) throw new ArgumentNullException(“task”);
var tcs = new TaskCompletionSource<T>(state);
task.ContinueWith(t =>
{
if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)
else if (t.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(t.Result);

if (callback != null) callback(tcs.Task);
}, TaskScheduler.Default);
return tcs.Task;
}

以下のメソッドを使って、

public static Task<string> DownloadStringAsync(Uri url);

以下のように実装する。

public IAsyncResult BeginDownloadString( Uri url, AsyncCallback callback, object state)
{
return DownloadStringAsync(url).AsApm(callback, state);
}

public string EndDownloadString(IAsyncResult asyncResult)
{
return ((Task<string>)asyncResult).Result;
}

EAP => TAP

public static Task<string> DownloadStringAsync(Uri url)
{
var tcs = new TaskCompletionSource<string>();
var wc = new WebClient();
wc.DownloadStringCompleted += (s,e) =>
{
if (e.Error != null) tcs.TrySetException(e.Error);
else if (e.Cancelled) tcs.TrySetCanceled();
else tcs.TrySetResult(e.Result);
};
wc.DownloadStringAsync(url);
return tcs.Task;
}

WaitHandle => TAP

public static Task WaitOneAsync(this WaitHandle waitHandle)
{
if (waitHandle == null) throw new ArgumentNullException(“waitHandle”);

var tcs = new TaskCompletionSource<bool>();
var rwh = ThreadPool.RegisterWaitForSingleObject(waitHandle,
delegate { tcs.TrySetResult(true); }, null, -1, true);
var t = tcs.Task;
t.ContinueWith(_ => rwh.Unregister(null));
return t;
}

static Semaphore m_throttle = new Semaphore(N, N);

static async Task DoOperation()
{
await m_throttle.WaitOneAsync();
… // do work
m_throttle.ReleaseOne();
}

すげぇ便利そうで使えそうな気もするし、何やってるのかわかるんだけど、意味わからん。

TAP => WaitHandle

WaitHandle wh = ((IAsyncResult)task).AsyncWaitHandle;

Unitテストについて

[TestMethod]
public async Task UnitTest()

public async void Func()は、イベントハンドラー以外ではやっちゃだめ。例外が取れない。

Reactive Extensions (Rx)

@ITのReactive Extensions (Rx) 入門

コメントを残す