C#エンジニアのためのBigQuery入門(5)
BigQueryでのテーブル作成とデータ挿入
連載最終回。C#のクライアントライブラリを使ってBigQueryへデータを挿入する方法を説明。Google Cloud Storage APIの活用や、Streaming Insertについても解説する。
さて今までの連載では、テーブルはすでに公開されているものを利用して説明してきた。今回の記事ではテーブルを自作し、そこにデータを挿入する方法を説明する。
データ挿入の方法
BigQueryでデータ投入する場合、投入元のデータはファイルとして作成しておくケースが多い。そして挿入元のデータの配置場所としてBigQueryがサポートしているのが、
- Google Cloud Storage
 - HTTP Postリクエストによるローカルファイルからの挿入
 
である。前者には大量のファイルをまとめて挿入できるといったメリットがあり、後者にはファイルのある場所から手軽に直接挿入できるというメリットがある。BigQueryのWebコンソールから実行できる方法もこの2つだ。
この他、
- BigQuery独自の仕組みであるStreaming Insertによるデータ挿入
 
も可能である。本稿ではこれについても説明する。これ以外には、第2回で紹介した、
- クエリ結果をテーブルとして保存する方法
 
が使えるケースもあるだろう。
これらのデータ挿入の方法は、BigQueryのWebコンソールや他の言語のクライアントライブラリからも実行可能であるが、今回は全てC#のプログラムから実行している。
Google Cloud Storageからのデータ挿入
まず、Google Cloud Storageからのデータ挿入について説明する。Google Cloud Storageとは、グーグルが提供するオブジェクト・ストレージ・サービスで、AWSのS3(Simple Storage Service)、AzureのBlobストレージサービスと同様のクラウドサービスである。この方法を採用する場合、サーバーにあるデータをいったんGoogle Cloud Storageにアップロードした後にBigQueryへインポートすることが多いため、アップロード、インポートの順に説明していく。
BigQueryにインポートする際にサポートされる形式は、CSVもしくは1行ごとが1レコードに対応するJSONのどちらかであり、それぞれ非圧縮もしくはGZip圧縮に対応している。データをインポートする速度は非圧縮の方が速いが、Cloud Storageに保存しているコストおよびアップロード時のネットワーク帯域幅のコストがかかるため、非圧縮/GZip圧縮のどちらが適しているかはケース・バイ・ケースで判断する必要がある。この他にも、ファイルサイズや一日あたりに実行できる回数の制限もあるため、実運用においてはデータ形式に関する詳細がまとめられている「Preparing Data for BigQuery(英語)」を参照してほしい。今回は、JSON形式のファイルをプログラムの中でGZip圧縮する(その方法は、アップロード&インポート処理を行う例の中で説明する)。
Google Cloud Storage APIの利用
BigQuery同様、Cloud StorageもAPIを利用するため準備が必要である。今連載の番外編および第2回で説明している手順でBigQueryのAPIを有効化している場合、必要な手順はGoogle Developers ConsoleでGoogle Cloud Storage APIを有効化しておくことである。Google Cloud Storage JSON APIを有効化しておこう。
 APIに対応したC#のクライアントライブラリもBigQuery同様、NuGetから参照を追加しておく。Google.Apis.Storage.v1をGoogle.Apis.BigQuery.v2と併せて追加しておいてほしい。
APIへの認証もBigQuery同様に実行する。今回いくつかの処理をコードで紹介するが、全てのコードを1つのクラスの中に定義できるようにしている。Google APIの認証を含めて共通の処理をリスト1に記載しているので、プロジェクト名や認証鍵の場所などを各自の環境に合わせて利用してほしい。なお、BigQueryとGoogle Cloud Storageは同じ証明書・サービスアカウントで認証しているが、アクセス管理などの理由で必要がある場合には別の証明書もしくはアカウントでも認証できる。
| 
 using System.IO; 
using System.IO.Compression; 
using System.Linq; 
using System.Security.Cryptography.X509Certificates; 
using Google.Apis.Auth.OAuth2; 
using Google.Apis.Bigquery.v2; 
using Google.Apis.Bigquery.v2.Data; 
using Google.Apis.Services; 
using Google.Apis.Storage.v1; 
using Google.Apis.Storage.v1.Data; 
using Google.Apis.Upload; 
using Google.Apis.Util; 
……省略…… 
private IBackOff backOff = new ExponentialBackOff(); 
private BigqueryService bigquery; 
private BigqueryService Bigquery => bigquery ?? (bigquery = CreateBigqueryClient()); 
private StorageService storage; 
private StorageService Storage => storage ?? (storage = CreateStorageClient()); 
private string ProjectId => "<プロジェクトID>"; 
private BigqueryService CreateBigqueryClient() 
{ 
  var certificate = new X509Certificate2(@"<p12証明書ファイルのパス>", "<証明書のパスワード>", X509KeyStorageFlags.Exportable); 
  var credential = new ServiceAccountCredential(new ServiceAccountCredential.Initializer("<サービスアカウントのメールアドレス>") 
  { 
    Scopes = new[] 
    { 
      BigqueryService.Scope.Bigquery 
    } 
  }.FromCertificate(certificate)); 
  return new BigqueryService(new BaseClientService.Initializer 
  { 
    ApplicationName = "<アプリ名>", 
    HttpClientInitializer = credential 
  }); 
} 
private StorageService CreateStorageClient() 
{ 
  var certificate = new X509Certificate2(@"<p12証明書ファイルのパス>", "<証明書のパスワード>", X509KeyStorageFlags.Exportable); 
  var credential = new ServiceAccountCredential(new ServiceAccountCredential.Initializer("<サービスアカウントのメールアドレス>") 
  { 
    Scopes = new[] 
    { 
      StorageService.Scope.CloudPlatform 
    } 
  }.FromCertificate(certificate)); 
  return new StorageService(new BaseClientService.Initializer 
  { 
    ApplicationName = "<アプリ名>", 
    HttpClientInitializer = credential 
  }); 
} 
 | 
一部のusing節やプロパティはリスト2内では利用していないが、この後のコードで利用する。
Google Cloud Storageへのアップロード
それではGoogle Cloud Storageにファイルをアップロードしてみよう。
Google Cloud Storageはバケットというオブジェクトの入れ物を用意し、その中にキーを指定してオブジェクトを管理する。バケットは地理的に存在する場所を指定できるが、BigQueryの場所と同一にしておくのがいいだろう。またストレージクラスという指定があるが、これは冗長性を指定できる。要件に合わせて指定すればよいが、Google Cloud StorageではNEARLINEと高可用性コールドストレージというオプションが選択できる。コールドストレージといっても最初の1byteの取得が3秒程度でありコストも安い。今回テストコードであるのでNEARLINEを指定しているが、実際の運用でも十分利用価値のあるオプションだろう。
リスト2が、実際にアップロードするファイルの内容である。
| 
 {"name":"a", "value":3} 
{"name":"b", "value":5} 
 | 
 リスト2のファイルGZip圧縮し、「import-test-bucket」というバケットを作成し、そのバケットに「data.json.gz」というキーで格納するコードがリスト3である。アップロードする処理でwhile文で成功するまで一定回数繰り返している。これは失敗する可能性があり、かつ繰り返し実行しても整合性が保てる処理のエラー時対策である。単純に一定時間間隔だけ待ってもよいが、繰り返しエラーになるときはエラー状態がより長く続くことが予想されるため、試行回数が多くなるにつれて指数関数的に待機時間を長くするExponential Backoffを採用している。Google API Client Libraryを参照すると、その中にExponentialBackOffクラスが含まれているのですぐに利用できる。
| 
 async Task Upload() 
{ 
  var bucketName = "import-test-bucket"; 
  // バケットの作成。すでに同じ名前で作成している場合はエラーになる。 
  await Storage.Buckets.Insert(new Bucket 
  { 
    Name = bucketName, 
    Location = "US", 
    StorageClass = "NEARLINE" 
  }, ProjectId).ExecuteAsync(); 
  Console.WriteLine("Bucket created."); 
  //pathで指定したファイルをGZip圧縮してGCSにアップロード 
  var path = @"<JSONファイルの場所>"; 
  byte[] bytes; 
  using (var ms = new MemoryStream()) 
  { 
    using (var gzip = new GZipStream(ms, CompressionMode.Compress)) 
    using (var reader = new FileStream(path, FileMode.Open)) 
    { 
      reader.CopyTo(gzip); 
    } 
    bytes = ms.ToArray(); 
  } 
  var retry = 1; 
  while (true) 
  { 
    using (var str = new MemoryStream(bytes)) 
    { 
      var res = await Storage.Objects.Insert(new Google.Apis.Storage.v1.Data.Object 
      { 
        Bucket = bucketName, 
        Name = "data.json.gz", 
      }, bucketName, str, "application/gzip").UploadAsync(new CancellationToken()); 
      if (res.Status == UploadStatus.Completed) 
      { 
        Console.WriteLine("File uploaded"); 
        break; 
      } 
    } 
    await Task.Delay(backOff.GetNextBackOff(retry)); 
    retry++; 
    if (retry > backOff.MaxNumOfRetries) 
    { 
      throw new TimeoutException(); 
    } 
  } 
} 
 | 
Google Cloud Storageからのインポート
次にアップロードしたファイルをBigQueryにインポートしよう。
 BigQueryにインポートする際は、Google Cloud Storage上のファイルをgs://<bucket名>/<オブジェクトのキー>という形式のURIで指定できるが、配列で複数指定したり、*を使って前方一致で複数指定したりすることができる。今回は1つしかファイルは存在しないが、前方一致で指定している。
 CreateDispositionプロパティでインポートする際にテーブルを作成するかどうかを指定できる。今回はテーブルが存在しないため、インポートと同時にテーブルを作成している。この場合は、Schemaプロパティに作成するテーブルのスキーマを指定する。カラムのデータ型などはドキュメントコメントを参照してほしい。
 インポートするAPIを実行すると、返り値はJobとして返ってくる。これはGoogle Cloudのサーバー側で長時間かかる処理を実行するときに使われるもので、第2回のクエリの実行のところで出てきたJobと同じものである。返ってきたJobのIdを指定してJobの状態を取得できるため、完了もしくはエラーによる終了まで定期的にJobの状態を監視し、結果を取得する必要がある。これを行っているのが、リスト4のWaitForCompleteメソッドである。
| 
 async Task ImportFile() 
{ 
  var bucketName = "import-test-bucket"; 
  var dataset = "import_test"; 
  await Bigquery.Datasets.Insert(new Dataset 
  { 
    DatasetReference = new DatasetReference 
    { 
      ProjectId = ProjectId, 
      DatasetId = dataset 
    } 
  }, ProjectId).ExecuteAsync(); 
  Console.WriteLine("Dataset created."); 
  var job = await bigquery.Jobs.Insert(new Job 
  { 
    Configuration = new JobConfiguration 
    { 
      Load = new JobConfigurationLoad 
      { 
        CreateDisposition = "CREATE_IF_NEEDED", 
        DestinationTable = new TableReference 
        { 
          ProjectId = ProjectId, 
          DatasetId = dataset, 
          TableId = "UploadAndImportFile" 
        }, 
        SourceFormat = "NEWLINE_DELIMITED_JSON", 
        SourceUris = new List<string> 
        { 
          $"gs://{bucketName}/*" 
        }, 
        WriteDisposition = "WRITE_APPEND", 
        Schema = new TableSchema 
        { 
          Fields = new List<TableFieldSchema> 
          { 
            new TableFieldSchema 
            { 
              Name = "name", 
              Type = "STRING", 
              Mode = "REQUIRED" 
            }, 
            new TableFieldSchema 
            { 
              Name = "value", 
              Type = "INTEGER", 
              Mode = "REQUIRED" 
            } 
          } 
        } 
      } 
    } 
  }, ProjectId).ExecuteAsync(); 
  Console.WriteLine($"Job started with JobId: {job.JobReference.JobId}"); 
  await WaitForComplete(job); 
  Console.WriteLine("Completed"); 
} 
async Task WaitForComplete(Job job) 
{ 
  var timeout = TimeSpan.FromMinutes(5); 
  var interval = TimeSpan.FromSeconds(2); 
  var times = timeout.Ticks / interval.Ticks; 
  var c = 0; 
  while (true) 
  { 
    var statusRes = await Bigquery.Jobs.Get(ProjectId, job.JobReference.JobId).ExecuteAsync(); 
    Console.WriteLine(statusRes.Status.State); 
    if (statusRes.Status.State != "RUNNING" && statusRes.Status.State != "PENDING") 
    { 
      var error = statusRes.Status.ErrorResult; 
      if (error != null) 
      { 
        throw new Exception(error.Message); 
      } 
      return; 
    } 
    if (++c >= times) 
    { 
      throw new TimeoutException(); 
    } 
    await Task.Delay(interval); 
  } 
} 
 | 
インポート処理のAPIはJobを返すため、Jobが終了するまで監視するメソッドを定義している。
POSTリクエストによるファイルからのデータ挿入
次にHTTP POSTリクエストによるデータ挿入を説明する。
Google Cloud Storageからのインポートであれば、大容量のデータをインポートできる半面、一度Google Cloud Storageにアップロードする必要がある。
 ファイル単体であれば、直接BigQueryに挿入する方法が用意されている。先ほどGoogle Cloud Storageにアップロードした同じファイルを用いて実行したのがリスト5のコードである。リスト4同様、Jobを作成するAPIをたたいているが、第3引数にアップロードするStreamを渡している。これにより内部ではMedia Uploadと呼ばれる処理が実行され、返り値もJobにおけるアップロード状況を取得できるIUploadProgress型になる。
| 
 async Task CreateWithPostRequest() 
{ 
  var dataset = "test"; 
  using (var stream = new FileStream(@"<JSONファイルの場所>", FileMode.Open)) 
  { 
    var upload = await Bigquery.Jobs.Insert(new Job 
    { 
      Configuration = new JobConfiguration 
      { 
        Load = new JobConfigurationLoad 
        { 
          SourceFormat = "NEWLINE_DELIMITED_JSON", 
          Schema = new TableSchema 
          { 
            Fields = new List<TableFieldSchema> 
        { 
          new TableFieldSchema 
          { 
            Name = "name", 
            Type = "STRING" 
          }, 
          new TableFieldSchema 
          { 
            Name = "value", 
            Type = "INTEGER" 
          }, 
        } 
          }, 
          DestinationTable = new TableReference 
          { 
            ProjectId = ProjectId, 
            DatasetId = dataset, 
            TableId = "ByPostRequest" 
          }, 
          CreateDisposition = "CREATE_IF_NEEDED", 
        } 
      } 
    }, ProjectId, stream, "application/octet-stream").UploadAsync(); 
    Console.WriteLine(upload.Status); 
    Console.WriteLine(upload.Exception); 
  } 
} 
 | 
FiddlerによるAPIのエラー原因調査
少し今回のテーマとは異なるが、Google Client Libraryを実行する際のデバッグ方法を1つ紹介したい。
 リスト5のInsertメソッドの第4引数をapplication/octet-streamからapplication/jsonなどに変えてみてほしい。「パラメーターbaseUriに対するArgumentNullException」という例外が発生するはずだ。しかしこのパラメーターはGoogle Client Libraryの内部のメソッドのパラメーターであり、なぜこれがNullになるのか調べるのは、このメッセージからだけでは難しい。
そこでFiddlerなどのHTTP通信をプロキシでキャプチャするツールを使ってみてほしい。そのFiddlerで、実際にエラーの発生する状況での通信をキャプチャした結果が図1だ。
 「Invalid content type 'application/json'. Uploads must have content type 'application/octet-stream'.」と表示されており、ContentTypeの指定が間違っていることが分かる。このように一部例外の直接の原因が握りつぶされているケースがあるため、よく分からないエラーメッセージが出た場合は、HTTP通信をキャプチャするのが解決の糸口になることがある。
Streaming Insertによるデータ挿入
Streaming InsertはHTTP POSTによるデータを挿入し、すぐにクエリ対象とすることができるデータ挿入の方法である。Streaming Insertについては第1回でも紹介した。
 実際には、挿入するデータを1行につき1つのDictionary<string,object>インスタンスとしてAPIに渡して実行する。また、Streaming Insertは事前にテーブルが作成されている必要があるため、リスト6のコードはテーブルを作成する処理も含めている。挿入するデータは、リスト4やリスト5で挿入したJSONファイルと同じものである。
| 
 async Task CreateAndStreamginInsert() 
{ 
  var dataset = "test"; 
  var table = await Bigquery.Tables.Insert(new Table 
  { 
    Schema = new TableSchema 
    { 
      Fields = new [] 
          { 
            new TableFieldSchema 
            { 
              Name = "name", 
              Type = "STRING", 
              Mode = "REQUIRED" 
            }, 
            new TableFieldSchema 
            { 
              Name = "value", 
              Type = "INTEGER", 
              Mode = "REQUIRED" 
            } 
          } 
    }, 
    TableReference = new TableReference 
    { 
      ProjectId = ProjectId, 
      DatasetId = dataset, 
      TableId = "StreaminInsert" 
    } 
  }, ProjectId, dataset).ExecuteAsync(); 
  var response = await bigquery.Tabledata.InsertAll(new TableDataInsertAllRequest 
  { 
    Rows = new [] 
    { 
      new TableDataInsertAllRequest.RowsData 
      { 
        Json = new Dictionary<string, object> {["name"]="a",["value"]=3} 
      }, 
      new TableDataInsertAllRequest.RowsData 
      { 
        Json = new Dictionary<string, object> {["name"]="b",["value"]=5} 
      } 
    } 
  }, ProjectId, dataset, "StreaminInsert").ExecuteAsync(); 
  if (response.InsertErrors?.Any() == true) 
  { 
    foreach (var errorProto in response.InsertErrors.SelectMany(error => error.Errors)) 
    { 
      Console.WriteLine(errorProto.Message); 
    } 
  } 
  Console.WriteLine("Completed"); 
} 
 | 
なお、Streaming Insertには1回あたりに送信できるデータの量や1テーブルあたりに挿入できる流量に制限がある。送信するクライアント側で、この制限を上回らないように適宜バッファリングの実施などを考慮する必要があるだろう。また、流量の多いところでStreaming Insertを行うと、課金額も大きくなるため、流量(つまり課金額)の監視も必要になる。
まとめ
全5回と番外編の計6回にわたってBigQueryに関してC#エンジニアが利用するには、という視点で連載を行ってきた。BigQueryは比較的他のクラウド上のサービスと比べて独立しているため、たとえグーグル以外のクラウドやクラウド以外の環境でサービスが稼働している場合でも利用しやすいサービスだろう。また、.NETのクライアントライブラリがあるため、C#エンジニアでもC#のスキルを活用して利用できる。ぜひ、この連載を参考にBigQueryを使ってみてほしい。
※以下では、本稿の前後を合わせて5回分(第2回~第6回)のみ表示しています。
 連載の全タイトルを参照するには、[この記事の連載 INDEX]を参照してください。
2. Google API Client Library for .NETの使い方
BigQueryをはじめ、GoogleのほとんどのサービスはAPIが提供されている。これを.NETから利用するためのライブラリの基本的な使用方法を解説する。
5. LINQでBigQuery: データスキャン量を抑えたクエリの実行方法
膨大なデータへのクエリで、スキャン量を減らしてクエリの課金額を抑えるには? テーブルワイルドカード関数とテーブルデコレーターを説明する。