Azure Durable FunctionsのFunctions Fan-In/Fan-Outを試してみたので記載していきます。
試した内容はDurable Fuctionsの公式にサイトにあるSamplesとWalkthrouthsの内容です。
Azure Durable Functionsのローカル実行環境を構築していない人は下記を参照してください。
本記事はローカル実行環境が構築済みであることを前提とします。
Durable Functionsの他の機能についてはこちら。
※本記事は2017年7月28日時点の情報となります。
※現時点ではWindosOSでのみ対応しているのでWindows10で実施した内容となります。
Fan-In/Fan-Out 基本編
Fan-In/Fan-Outは複数の関数を同時に呼び出すオーケストレータ関数を記述し、その結果に対して何らかの集計を実行する方法を示す高度なユースケースです。
下図で説明するとF2のFunctionが並列で複数同時に実行されるのをイメージしてください。並列で実行されたFunction毎の処理完了タイミングは別々ですので全てのFunctionが完了するのを待ってから次の処理に移行します。例えばAzure Blob Storageにファイルアップロードを並列で行う等の処理にむいていたりします。
公式サイトにあるサンプルソース(VSSample)の中身を見ていきましょう。
サンプルソースにあるBackupSiteContent.csがFan-In/Fan-Outのサンプルになります。このサンプルは指定ディレクトリ配下にあるファイルを全て取得し再帰的にBlobStorageにアップロードするというものです。
このソース内にはE2_BackupSiteContentというorchestrator functionとE2_GetFileList、E2_CopyFileToBlobの3つの関数が用意されています。実行時に呼び出す関数はE2_BackupSiteContentになります、パラメータとしてわたってきたフォルダパスを用いてE2_GetFileList関数でファイル情報を取得し、マルチタスク化したE2_CopyFileToBlobで複数同時にアップロードします。
実行する前にアップロード用のテストファイルを作成する必要があるので下記のコマンドを実行します。管理者権限でコマンドプロンプトを起動して実行してください。
fsutil file createnew testfile 1000000
私は下記のようにテストファイルを作成して実施しました。
public static class BackupSiteContent
{
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run( [OrchestrationTrigger] DurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>();
if (string.IsNullOrEmpty(rootDirectory))
{
throw new ArgumentNullException(nameof(rootDirectory), "A root directory must be specified");
}
string[] files = await backupContext.CallFunctionAsync<string[]>("E2_GetFileList",rootDirectory);
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallFunctionAsync<long>("E2_CopyFileToBlob",files[i]);
}
await Task.WhenAll(tasks);
long totalBytes = tasks.Sum(t => t.Result);
return totalBytes;
}
[FunctionName("E2_GetFileList")]
public static string[] GetFileList([ActivityTrigger] DurableActivityContext getFileListContext,TraceWriter log)
{
string rootDirectory = getFileListContext.GetInput<string>();
log.Info($"Searching for files under {rootDirectory}...");
string[] files = Directory.GetFiles(rootDirectory, "*", SearchOption.AllDirectories);
log.Info($"Found {files.Length} file(s) under {rootDirectory}.");
return files;
}
[FunctionName("E2_CopyFileToBlob")]
public static async Task<long> CopyFileToBlob([ActivityTrigger] DurableActivityContext copyFileContext,Binder binder,TraceWriter log)
{
string filePath = copyFileContext.GetInput<string>();
long byteCount = new FileInfo(filePath).Length;
// strip the drive letter prefix and convert to forward slashes
string blobPath = filePath.Substring(Path.GetPathRoot(filePath).Length).Replace('\\', '/');
string outputLocation = $"backups/{blobPath}";
log.Info($"Copying '{filePath}' to '{outputLocation}'. Total bytes = {byteCount}.");
// copy the file contents into a blob
using (Stream source = File.Open(filePath, FileMode.Open))
using (Stream destination = await binder.BindAsync<CloudBlobStream>(new BlobAttribute(outputLocation)))
{
await source.CopyToAsync(destination);
}
return byteCount;
}
}
今回はAzureStorageにアップロードするので実際にBlobStorageを作成して試しました。local.setting.jsonファイルのAzureWebJobsStorageとAzureWebJobsDashboardに実際のAzure Storageの接続文字列を設定します。
“AzureWebJobsStorage”: “DefaultEndpointsProtocol=https;AccountName=***;AccountKey=*****”,
“AzureWebJobsDashboard”: “DefaultEndpointsProtocol=https;AccountName=***;AccountKey=*****”,
POSTする時に若干の設定が必要になります。POSTパラメータとしてJSON形式でディレクトリパスを設定する必要があるのでHeadersにContent-Type:application/json、Bodyにrawで“ディレクトリパス”を指定します。ここではローカル実行するのでローカルのディレクトリパスを指定しています。
- Headersの設定
- Bodyの設定
実行URLは下記を指定します。
それでは実行してみましょう。
POSTして正常に処理が完了するとレスポンスとして3つのURLが返却されます。
- statusQueryGetUri
- sendEventPostUri
- terminatePostUri
サンプルサイトではstatusQueryGetUriしか利用していません。statusQueryGetUriは状態を確認するためのURLでGETで叩くと処理状況を確認することができます。statusQueryGetUriをGETで状況を確認してみます。
処理中であればrutimeStatusが”Running”で表示されます。
処理が完了していればrutimeStatusが”Completed”、outputにアップロードしたファイルのトータル容量が表示されます。
AzureStorageを確認すると正常にアップロードされているのがわかります。
Fan-In/Fan-Outの肝となる部分は下記のソースの部分です。
var tasks = new Task<long>[files.Length];
for (int i = 0; i < files.Length; i++)
{
tasks[i] = backupContext.CallFunctionAsync<long>(
"E2_CopyFileToBlob",
files[i]);
}
await Task.WhenAll(tasks);
取得したファイル情報分のアップロードTaskを作成しWhenAllで非同期に実行して全てのTaskが完了するのを待ちます。この方法で効率よく関数を並列で実行し処理効率をあげることができます。但し、このソースだとファイル数分のTaskを作成してしまうためファイル数が多ければその分Taskを作成して並列で実行してしまうため処理負荷が高まりエラーが発生する可能性も高まります。
Fan-In/Fan-Out 応用編
先にも述べたとおり並列実行数が多いとエラーの発生率が高まるため、並列実行数に制限をかける様にソースを変更したいと思います。
試しにソースを変更する前に下記のテストデータで実行してみます。15ファイルあるので変更前のソースだと15Taskで並列に実行されます。
実行すると何件かエラーが発生しました。
実際に上がっているファイルを確認すると15ファイル中10ファイルがアップロードされました。あがっていないファイルは何かしらの理由、多分ですが高負荷が理由でエラーとなりアップロードに失敗したと思われます。
この現象を回避すべく同時並行で実こうするTaskに制限をかけるように修正します。
Runメソッドを下記のように書き換えました。
[FunctionName("E2_BackupSiteContent")]
public static async Task<long> Run([OrchestrationTrigger] DurableOrchestrationContext backupContext)
{
string rootDirectory = backupContext.GetInput<string>();
if (string.IsNullOrEmpty(rootDirectory))
{
throw new ArgumentNullException(nameof(rootDirectory), "A root directory must be specified");
}
string[] files = await backupContext.CallFunctionAsync<string[]>("E2_GetFileList",rootDirectory);
var tasks = new List<Task<long>>();
long totalBytes = 0;
for (int i = 0; i < files.Length; i++)
{
tasks.Add(backupContext.CallFunctionAsync<long>("E2_CopyFileToBlob",files[i]));
// Taskを5並列で実行
if ((tasks.Count == 5 && files.Length >= 5) ||
(tasks.Count == files.Length % 5 && files.Length >= 5) ||
(files.Length < 5 && tasks.Count == files.Length))
{
await Task.WhenAll(tasks);
totalBytes += tasks.Sum(t => t.Result);
tasks = new List<Task<long>>();
}
}
return totalBytes;
}
TaskをListに追加して5件づつ処理するようにしました。条件文がちょっと複雑になりましたがご愛嬌で。並列実行数が5件以内になるので安定してアップロードできるようになったと思います。マシンスペックにもよりますがアップロードするファイルの容量によって並列実行数を動的に変更できたりすると効率よくアップロードできると思います。
Fan-In/Fan-Outは並列処理で効率よく捌けるので使いどころの多いパターンだと思います。
コメント