Deep Insider の Tutor コーナー
>>  Deep Insider は本サイトからスピンオフした姉妹サイトです。よろしく! 
C#エンジニアのためのBigQuery入門(2)

C#エンジニアのためのBigQuery入門(2)

BigQuery API(Client Library)を用いたクエリ実行

2015年7月7日

BigQueryをより便利に使うために、C#プログラムからAPIを利用する方法を紹介する。

株式会社グラニ 田中 孝佳
  • このエントリーをはてなブックマークに追加

BigQueryをC#プログラムから実行しよう

 本連載第1回目ではBigQueryの概要を紹介した。BigQueryには使いやすいWebコンソール画面が用意されており、まず使ってみる分にはWebコンソールで十二分である。しかし、例えば定期的に実行してレポートを作成・送信したい場合や、取得したデータに対してさらに複雑な処理を行いたい場合もある。そのために、BigQueryを含めてGoogleサービスにはAPIが用意されており、その認証方法について前回説明した。

 そこで今回は、BigQueryをC#プログラムから利用し、クエリを実行する方法を説明したい。なお今回は、Client Library for .NETのAPIをそのまま利用する方法を紹介する。C#に不可欠といってもよいLINQとBigQueryをつなげたLINQ to BigQueryを次回紹介する予定である。また、解析するためのデータをC#プログラムから投入する方法はその後の回で紹介したい。

クエリをAPIから実行してみよう

プロジェクトの作成と認証の準備

 最初にクエリを実行するために、Google Developer Consoleにログインし、プロジェクトを作成する。すでに作成しているプロジェクトがあればそれを利用してもよい。認証のためのクライアントIDを作成するために、作成したプロジェクトの[API と認証]メニューから[同意画面]を開き、[サービス名]を入力して保存する。その後、[API と認証]の[認証情報]を開き、[新しいクライアント ID を作成]ボタンをクリックし、「サービス アカウント」およびキーのタイプで「P12 キー」を選択して、クライアントIDを作成する。作成するとP12ファイルがダウンロードされるとともに、そのパスワードが表示されるため記録しておく。詳細な手順は前回の番外編の記事も参考にしてほしい。

 準備ができたところで、連載第1回でも紹介した、publicに公開されているサンプルのデータセットに対するクエリをC#から実行してみよう。

シンプルなクエリ実行

 BigQueryのWebコンソールでクエリを実行したときと同じことをC#からやってみよう。まず、コンソールプロジェクトを作成し、NuGetからリスト1のコマンドを実行してBigQueryのクライアントライブラリを取得する。

Install-Package Google.Apis.Bigquery.v2
リスト1 パッケージ・マネージャー・コンソールからGoogle BigQuery APIを追加するためのコマンド

本稿では執筆時点での最新版である1.9.0.2160を利用した。

 次に、認証を行い、API実行の起点となるBigQueryServiceクラスのインスタンスを生成するが、これは下記のようなコードになる。

C#
using Google.Apis.Auth.OAuth2;
using Google.Apis.Bigquery.v2;
using Google.Apis.Services;
using System.Security.Cryptography.X509Certificates;
……省略……

static BigqueryService CreateClient()
{
  var certificate = new X509Certificate2(@"p12ファイルのパス", "p12ファイルのパスワード", X509KeyStorageFlags.Exportable);

  var credential = new ServiceAccountCredential(new ServiceAccountCredential.Initializer("サービスアカウントのメールアドレス")
  {
    Scopes = new[]
    {
      BigqueryService.Scope.Bigquery
    }
  }.FromCertificate(certificate));

  return new BigqueryService(new BaseClientService.Initializer
  {
    ApplicationName = "API Sample",
    HttpClientInitializer = credential
  });
}
リスト2 認証を行い、BigQueryServiceクラスのインスタンスを作成するコード(Program.csファイルのProgramクラス内)

P12ファイルのパスとパスワード、およびサービスアカウントのメールアドレス(=先ほどの[認証情報]のページで取得できる)は各自の環境に合わせてほしい。

 クエリを実行するには、Jobs: query APIを実行する(リスト3)。

C#
using System;
using System.Threading.Tasks;
using Google;
using Google.Apis.Bigquery.v2.Data;
……省略……

static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query)
{
  try
  {
    return await bigquery.Jobs.Query(new QueryRequest
    {
      Query = query
    }, projectId).ExecuteAsync();
  }
  catch (GoogleApiException e)
  {
    Console.WriteLine(e);
    throw;
  }
}
リスト3 Jobs: query APIを実行するコード

 実行するとQueryResponse型の結果が取得できるので、最後に結果を表示してみよう(リスト4)。

C#
using System.Linq;
……省略……

static void DisplayResult(QueryResponse res)
{
  Console.WriteLine("Query Result:");
  Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
  foreach (var row in res.Rows)
  {
    Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
  }
  Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
  Console.WriteLine("TotalRows: {0}", res.TotalRows);
}
リスト4 クエリの結果をConsoleに表示するコード

 クエリの結果のスキーマや結果そのものを表示するだけではなく、スキャンしたバイト数と結果の行数も表示している。BigQueryでは、スキャンしたサイズに応じて課金されるので適宜確認できるようにしておくのがよい。

 クエリを指定して、これらのメソッドを実行するコードは次のようになる。

C#
static async Task ExecuteAsync()
{
  try
  {
    var bigquery = CreateClient();

    var projectId = "<Google Developer Consoleで作成したプロジェクトID>";

    var res = await Query(bigquery, projectId, "SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare");
    DisplayResult(res);
  }
  catch (Exception)
  {
  }

  Console.ReadLine();   // Enterキーを入力すると終了
}
リスト5 認証し、クエリを実行して、その結果を表示する一連のコード

プロジェクトIDはGoogle Developer Consoleのトップページなどで確認できる。

 このコードを実行すると、図1のようにクエリの結果が表示されるはずだ。

図1 クエリ結果が表示された画面

ちなみに上記のコードを実行するには、ProgramクラスのMainメソッド内にExecuteAsync.Wait();を書き加えることで、上記のExecuteAsyncメソッドを呼び出せばよい。

DryRun

 このコードではいきなりクエリを実行するが、Webコンソール同様、事前にクエリのエラーやスキャンするサイズを確認したいこともあるだろう。そのときは、DryRunオプション(=実際にはジョブを実行せずに、基本的に空のデータを処理統計情報付きで返すモード)を有効にするとよい。

 先ほど(リスト3)のQueryメソッドに、DryRun用の引数を追加して指定してみよう(リスト6)。

C#
static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query, bool dryrun = false)
{
  try
  {
    return await bigquery.Jobs.Query(new QueryRequest
    {
      Query = query, 
      DryRun = dryrun
    }, projectId).ExecuteAsync();
  }
  catch (GoogleApiException e)
  {
    Console.WriteLine(e);
    throw;
  }
}
リスト6 DryRunオプションを指定して、Jobs: query APIを実行するコード

 DryRunオプションの指定にかかわらず、クエリに文法エラーがあるとGoogleApiExceptionがスローされる。文法エラーがない場合、DryRunオプションが指定されていると、レスポンスの中身が変わる。その中身を確認するために、リスト4のDisplayResultメソッドを次のように修正してみよう。

C#
static void DisplayResult(QueryResponse res)
{
  if (res.TotalRows.HasValue)
  {
    Console.WriteLine("Query Result:");
    Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
    foreach (var row in res.Rows)
    {
      Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
    }
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
    Console.WriteLine("TotalRows: {0}", res.TotalRows);
  }
  else
  {
    Console.WriteLine("DryRun Result:");
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
  }
}
リスト7 DryRun指定に対応したConsole表示のコード

 ドキュメント(英語)には明示されていないが、DryRunオプションがtrueに指定されていた場合、QueryResponseオブジェクトのTotalRowsプロパティの値が「null」になること(=HasValue)を利用して条件分岐している。

クエリキャッシュ

 BigQueryはパフォーマンス向上のため、結果が変わらない場合はベストエフォートでキャッシュを利用してレスポンスを返すようにしている。この機能はデフォルトで有効になっているが、使わないように指定することもできる。次のコードのように、UseQueryCacheオプションを指定することで制御でき、実行した結果がキャッシュを使ったものかどうかはCacheHitプロパティで確認できる。

C#
static async Task<QueryResponse> Query(BigqueryService bigquery, string projectId, string query, bool dryrun = false, bool useQueryCache = true)
{
  try
  {
    return await bigquery.Jobs.Query(new QueryRequest
    {
      Query = query,
      DryRun = dryrun,
      UseQueryCache = useQueryCache
    }, projectId).ExecuteAsync();
  }
  catch (GoogleApiException e)
  {
    Console.WriteLine(e);
    throw;
  }
}

static void DisplayResult(QueryResponse res)
{
  if (res.TotalRows.HasValue)
  {
    Console.WriteLine("Query Result:");
    Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
    foreach (var row in res.Rows)
    {
      Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
    }
    Console.WriteLine(res.CacheHit.HasValue && res.CacheHit.Value ? "UseCache" : "NoCache");
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
    Console.WriteLine("TotalRows: {0}", res.TotalRows);
  }
  else
  {
    Console.WriteLine("DryRun Result:");
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
  }
}
リスト8 UseQueryCacheオプションを指定して、Jobs: query APIを実行するコード

非同期実行

 ここまで使ってきた、Jobs: query APIはGoogleサービスとしては同期的に実行するAPIであった(C#コードではasyncawaitで結果が返ってくるのを非同期に待機しているが、Googleサービス側ではAPIリクエストを受けて、結果をそのレスポンスで返している)。BigQueryのAPIには、より長い時間がかかるクエリの実行を想定して、非同期的に問い合わせる仕組みが備わっている。その流れは次の通りである。

  1. Jobs: insert APIで、クエリを実行するジョブを開始する
  2. Jobs: get APIで、ジョブの状態が完了になるまで待機する
  3. Jobs: getQueryResults APIで、ジョブとして実行したクエリの結果を取得する

 以下に示すのが、一連の流れをコードにしたものである。

C#
static async Task ExecuteAsync()
{
  try
  {
    var bigquery = CreateClient();
    var projectId = "<プロジェクトID>";
    var job =
      await StartQuery(bigquery, projectId,
          "SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare");
    var queryRes = await WaitForJobFinish(bigquery, job.JobReference);
    DisplayResult(queryRes);
  }
  catch (Exception)
  {
  }

  Console.ReadLine();   // Enterキーを入力すると終了
}

static Task<Job> StartQuery(BigqueryService bigquery, string projectId, string query)
{
  return bigquery.Jobs.Insert(new Job
  {
    Configuration = new JobConfiguration
    {
      Query = new JobConfigurationQuery
      {
        Query = query
      }
    }
  }, projectId).ExecuteAsync();
}

static async Task<GetQueryResultsResponse> WaitForJobFinish(BigqueryService bigquery, JobReference job)
{
  while (true)
  {
    Console.Write(".");
    var res = await bigquery.Jobs.Get(job.ProjectId, job.JobId).ExecuteAsync();
    if (res.Status.State != "RUNNING" && res.Status.State != "PENDING")
    {
      Console.WriteLine();
      var error = res.Status.ErrorResult;
      if (error != null)
      {
        Console.WriteLine(error.Message);
        throw new Exception(error.Message);
      }
      else
      {
        // errorsは警告含む
        var errors = res.Status.Errors;
        if (errors != null)
        {
          foreach (var errorProto in errors)
          {
            Console.WriteLine(errorProto);
          }
        }
        return await bigquery.Jobs.GetQueryResults(job.ProjectId, job.JobId).ExecuteAsync();
      }
    }
    await Task.Delay(TimeSpan.FromSeconds(3));
  }
}

static void DisplayResult(GetQueryResultsResponse res)
{
  if (res.TotalRows.HasValue)
  {
    Console.WriteLine("Query Result:");
    Console.WriteLine(string.Join(" ", res.Schema.Fields.Select(f => f.Name)));
    foreach (var row in res.Rows)
    {
      Console.WriteLine(string.Join(" ", row.F.Select(f => f.V)));
    }
    Console.WriteLine(res.CacheHit.HasValue && res.CacheHit.Value ? "UseCache" : "NoCache");
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
    Console.WriteLine("TotalRows: {0}", res.TotalRows);
  }
  else
  {
    Console.WriteLine("DryRun Result:");
    Console.WriteLine("TotalBytes: {0}", res.TotalBytesProcessed);
  }
}
リスト9 クエリを非同期のJobとして実行する方法

リスト5で記述したstatic async Task ExecuteAsync()メソッドは、後述の「同期実行でのタイムアウト」の説明で再度利用するので、ここでは/**/でコメントアウトしておいてほしい。

 StartQueryメソッドでジョブとしてクエリを開始した後、WaitForJobFinishメソッドでジョブが完了するまで待機している。より正確には、StatusPENDING(開始前)もしくはRUNNING(実行中)以外の状態になるまで3秒ごとにチェックしながら待機している。

 ジョブが完了した後、ErrorResultプロパティ値がnullでない場合はエラーで終了しているため、エラー情報を取得している。また、エラーではないが、警告が存在しているケースもあり得るため、Errorsプロパティ値もチェックしている。

 最後に、Jobs: getQueryResult API(=bigquery.Jobs.GetQueryResultsメソッド)でクエリの結果を取得し、表示している。

 DisplayResultメソッドは、リスト4で示したメソッドと引数の型が違うが、表示する情報は同じ名前のプロパティであるため、Jobs: query APIを使った場合と同じコードになっている。

 なお、このコードではWaitForJobFinishメソッドにタイムアウトの機能はないため、ジョブが完了しない限りこのメソッドは完了しない。実践で活用する場合には、適宜、タイムアウト処理を入れるのがよいだろう。ついでに、前半で説明した同期実行の場合のタイムアウト処理について、次に簡単に説明しておこう。

同期実行でのタイムアウト

 上記のようにジョブを使って非同期実行する場合はタイムアウトがないが、Jobs: query APIを使って同期実行する場合、デフォルトで10秒のタイムアウト制限がある。これは、TimeoutMsプロパティを指定することで、長くもしくは短くすることができる。

 指定した時間が経過してタイムアウトになった場合、先ほどの非同期実行のところで紹介したJobs: getQueryResult APIを使うことで、ジョブの状態および、ジョブが完了するまで待機した場合のクエリの結果を取得できる。具体的には、次のコードのように、QueryResponseオブジェクトのJobCompleteプロパティをチェックし、trueでなければジョブが完了するまで待機するようにすればよい。

C#
static async Task ExecuteAsync()
{
  try
  {
    var bigquery = CreateClient();

    var projectId = "<プロジェクトID>";

    var res = await Query(bigquery, projectId, "SELECT TOP(word, 50), COUNT(*) FROM publicdata:samples.shakespeare", false, false);
    if (res.JobComplete == true)
    {
      DisplayResult(res);
    }
    else
    {
      var job = await WaitForJobFinish(bigquery, res.JobReference);
      DisplayResult(job);
    }
  }
  catch (Exception)
  {
  }
}
リスト10 Jobs: query APIの結果がタイムアウトしたケースを考慮したコード

リスト9でコメントアウトしていたコードを復活させてこのコードに書き換える。逆にリスト9の方のstatic async Task ExecuteAsync()メソッドをコメントアウトする。

 脱線したが、再び非同期実行の説明に戻ろう。

バッチモードでの実行

 BigQueryにはクエリの同時実行数の上限が決められている。これを回避するために、ジョブの優先度をバッチモードに変更する方法がある。

 デフォルトで指定されるインタラクティブモードが可能な限りすぐにジョブを実行するのに対し、バッチモードではジョブをキュー(待ち行列)にためておき、BigQueryの空きリソースが出たタイミングで実行する。そのため、同時実行数の制限は受けない。ただし、日次の実行上限数の制限は受けることに注意してほしい。

 バッチモードでジョブを実行するためには、JobConfigurationQueryオブジェクトのPriorityプロパティでBATCHと指定すればよい。その後のジョブの完了の待機と結果の取得・表示は、インタラクティブモードのときと同じである。

C#
static Task<Job> StartQuery(BigqueryService bigquery, string projectId, string query, bool isBatch = false)
{
  return bigquery.Jobs.Insert(new Job
  {
    Configuration = new JobConfiguration
    {
      Query = new JobConfigurationQuery
      {
        Query = query,
        Priority = isBatch ? "BATCH" : "INTERACTIVE"
      }
    }
  }, projectId).ExecuteAsync();
}
リスト11 引数でバッチモードでのクエリ実行をできるようにしたコード

再び、リスト9のstatic async Task ExecuteAsync()メソッドの方を復活させてから、このコードを記述・実行する。

クエリ結果をテーブルとして保存する

 クエリの結果に対し、さらに複数のクエリを実行する場合などでは、クエリの結果をいったんBigQueryの独自テーブルに保存しておくのが便利なときがある(正確には、ここまで出てきたクエリも24時間で消える一時テーブルに保存されているが、「削除しない限り、永続的にテーブルに保存する」という意味である)。また、BigQueryの場合、クエリの出力サイズに制限があるが、テーブルに保存する場合はこの制限を受けない。

 なお、ここから先のデータセットの作成およびクエリ結果をテーブルとして保存する処理はGoogle Cloud Platformへの課金を有効にしていないと実行できない。Google Developer Consoleの請求先アカウントページで新しい請求先を登録の上、Developer Console右上の[設定]アイコンから[プロジェクト課金設定]を開き、プロジェクトの課金設定を有効にしてほしい(図2)。以降の処理は課金対象となるため、実行には注意だが筆者の環境では10回ほど実行したが、無償の範囲内に収まっている。Google Cloud Platformの無料トライアルも本稿執筆時点では行われているので活用してほしい。

図2 [設定]メニューで開いたプロジェクト課金設定

[設定]メニューの[プロジェクト課金設定]をクリック

図2 プロジェクト課金設定を開いたところ

[課金を有効にする]ボタンをクリック

図2 [設定]メニューで開いたプロジェクト課金設定(上)/プロジェクト課金設定を開いたところ(下)

[課金を有効にする]ボタンを押すと、そのプロジェクトに対して課金が有効になる。

 クエリ結果をテーブルとして保存する場合はJobConfigurationQueryオブジェクトのDestinationTableに保存するテーブルを指定する。このとき、プロジェクトおよびデータセットは存在している必要がある(ちなみに、後述のコードで必要となるDataset IDは第1回で示している図9のページから取得できる。テーブルはクエリ実行時に作成される)。また、AllowLargeResultsプロパティをtrueにすることでクエリの上限サイズの制限を超えられるが、クエリ構文に一部制限が入る。例えば、ここまでのサンプルで使っていたTOP関数は使用できないので、GROUP BYCOUNTを使ったクエリにする必要がある(リスト12)。

C#
static async Task ExecuteAsync()
{
  try
  {
    var bigquery = CreateClient();
    var projectId = "<プロジェクトID>";
    var job =
      await StartQueryToTable(bigquery, projectId,
          "SELECT word, COUNT(*)as count FROM publicdata:samples.shakespeare GROUP BY word▲");
    var queryRes = await WaitForJobFinish(bigquery, job.JobReference);
    DisplayResult(queryRes);
  }
  catch (Exception)
  {
  }

  Console.ReadLine();   // Enterキーを入力すると終了
}

static Task<Job> StartQueryToTable(BigqueryService bigquery, string projectId, string query, string datasetId, string tableId, bool isBatch = false)
{
  return bigquery.Jobs.Insert(new Job
  {
    Configuration = new JobConfiguration
    {
      Query = new JobConfigurationQuery
      {
        Query = query,
        Priority = isBatch ? "BATCH" : "INTERACTIVE",
        AllowLargeResults = true,
        DestinationTable = new TableReference
        {
          ProjectId = projectId, 
          DatasetId = datasetId,
          TableId = tableId
        }
      }
    }
  }, projectId).ExecuteAsync();
}
リスト12 クエリ結果をテーブルとして保存するようにしたコード

 ここまでC#コードからBigQueryのクエリを実行し、結果を取得する方法を紹介してきた。しかし、取得した結果をC#コードで処理することを考えたとき、今のままでは1レコードがobjectの配列となっており、非常に使い勝手が悪い。特に、C#でデータ処理をするのであれば、LINQ to Objectsの活用を検討したいものである。そこで、次回はLINQの使い勝手のままにBigQueryのクエリが利用できる、LINQ to BigQueryを紹介したい。

※以下では、本稿の前後を合わせて5回分(第1回~第5回)のみ表示しています。
 連載の全タイトルを参照するには、[この記事の連載 INDEX]を参照してください。

C#エンジニアのためのBigQuery入門(2)
1. 誰でも簡単に超高速なクエリができるBigQueryとは?

知らないと損! 使わないと損! これからのデータ解析に必須のBigQueryの概要を紹介。また、Webコンソールからのクエリ実行の基礎を解説する。

C#エンジニアのためのBigQuery入門(2)
2. Google API Client Library for .NETの使い方

BigQueryをはじめ、GoogleのほとんどのサービスはAPIが提供されている。これを.NETから利用するためのライブラリの基本的な使用方法を解説する。

C#エンジニアのためのBigQuery入門(2)
3. 【現在、表示中】≫ BigQuery API(Client Library)を用いたクエリ実行

BigQueryをより便利に使うために、C#プログラムからAPIを利用する方法を紹介する。

C#エンジニアのためのBigQuery入門(2)
4. LINQ to BigQueryによるクエリ実行

C#プログラマーにおなじみのLINQでBigQueryのクエリを実行するライブラリを紹介。

C#エンジニアのためのBigQuery入門(2)
5. LINQでBigQuery: データスキャン量を抑えたクエリの実行方法

膨大なデータへのクエリで、スキャン量を減らしてクエリの課金額を抑えるには? テーブルワイルドカード関数とテーブルデコレーターを説明する。

サイトからのお知らせ

Twitterでつぶやこう!