Cloud Functions(Python) で BigQuery Storage Write APIを叩く

    この投稿をXにポストするこの投稿をFacebookにシェアするこのエントリーをはてなブックマークに追加

ちょっと直近使うこともあったので簡単にメモとなります。
下記の記事などを参考にさせて頂いています。
https://zenn.dev/aldagram_tech/articles/8e09cff2bd74a1
https://qiita.com/m77so/items/8b0d608489c602f91a87

前置き

Storage Write APIのいいところ

  • 公式が推している(たしか)
  • offsetを用いたデータの格納の観点で、冪等席を担保しやすい構成である
  • gRPCを通して高速にデータの格納を行えるところ
  • insertALL(streaming api)に対して従量課金が半値になるほど安い
  • 詳細はこちら

Storage Write APIのうーんなところ

  • 下記の記事で書かれているような部分でしょうか
  • https://hack.nikkei.com/blog/advent20221221/#chapter5

ではさっそく

事前の要件

  • 分単位でファイルが吐き出され、BigQueryに格納したい
  • 吐き出されるファイルはGoogle Cloud Storageに配置される
  • 単純に書き込む形式ではなく、少しETLを実施の上で書き込みたい(複数の可変となるヘッダー行をSKIPする程度)
  • データはログに近しい形式であり、BQへの格納方法は追記(積み上げ)で問題ない

構成

  • 絵を書くのが面倒なので文字だけで書くと下記のシンプル構成
    • ① GCSにファイル配置される
    • ② Cloud Functionsが発火され、ファイル読み込み
    • ③ BQへの書き込みがStorage Write APIを通して行われる

① BigQuery上にテーブルを作成する

今回は転送したい情報が≒csvファイル形式だったので、それに合わせた形でテーブルを構築します

DROP table  `sample_pj.work.wr_storega_sample`;
CREATE or replace TABLE
 `sample_pj.work.wr_storega_sample`
(
  type_name STRING NOT NULL,
  create_date date NOT NULL,
  name1 STRING,
  name2 STRING,
  name3 STRING,
  name4 STRING,
  name5 STRING,
  name6 STRING,
  name7 STRING,
  name8 STRING,
  name9 STRING,
  name10 STRING,
)
PARTITION BY create_date
;

② Protocol Buffer定義を作成の上で、Pythonで扱うためのラッパーを作成する

  • これは事前にローカル(ubuntu)で実施しました
  • protocコマンドを叩くのですが、Pythonだとverが20.3以上じゃないとダメと言われ、updateかけるか、こちらから環境にあったものをダウンロードしてもらえればと思います
  • 私は、上記から最新をダウンロードし、面倒なので解凍された中のbin配下を直接叩きにいきました
syntax = "proto2";

message WrStoregaSample {
  required string type_name = 1;
  required string create_date = 2;
  required string name1  = 3;
  required string name2  = 4;
  required string name3  = 5;
  required string name4  = 6;
  required string name5  = 7;
  required string name6  = 8;
  required string name7  = 9;
  required string name8  = 10;
  required string name9  = 11;
  required string name10  = 12;
}

上記の定義に対して、protocを叩いてPythonファイルを作成します

protoc --python_out=. gs_log.proto

gs_log_pb2.pyが作成されたらOKです。こちらをmain側でimportしていきます。

③ローカルで疎通確認後、Functionsへ

  • ローカルで事前に疎通確認(ローカルに配置されたファイルがBQへ問題なく書き込みされることの確認)まで行いましたが、下記コードとほぼ同じなので割愛します
  • Functionsは第一世代を使用しました
  • トリガーはもちろんgcsを設定し、事前にgcs側にバケットを作成しておきました
  • 割り当てメモリは512MB、Pythonは3.7を利用しました
    • 意図としては、Pythonのversionが3.11やメモリを最小にした場合だと、デプロイ時にエラーが出てまい、StackOverflowなどを見ると上記が解決作と出ていたため、それで対応した形です。とりあえず動けばOKな検証なので、詳細は追及していませんmm

Functionsのフォルダ構成(インラインエディタ)としては下記

├─ main.py
├─ requirements.txt
├─ サービスアカウントのJsonファイル(検証なのでキーファイルをそのまま配置)
└─ gs_log_pb2.py(先ほど作ったものを)

ちなみにgcsに配置されるデータイメージは

ヘッダー行1
~
(可変の)ヘッダ行N番目(最終)
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
~
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10

という感じ。前置きが長くなったが、これを読みだすFunctionsの本チャンコードはこちら

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2

from google.cloud import storage

import datetime
import os
import codecs

# Protocol Buffer 
import gs_log_pb2

# 事前に定義したテーブルに合わせたデータの作成
def create_row_data(type_name,create_date,name1,name2,name3,name4,name5,name6,name7,name8,name9,name10):
    row = gs_log_pb2.WrStoregaSample()
    row.type_name = type_name
    # 時刻が%Y%m%dの形式で入ってくるのでDataに合わせる
    row.create_date =  create_date[0:4] + "-" + create_date[4:6] + "-" + create_date[6:]
    row.name1 = name1
    row.name2 = name2
    row.name3 = name3
    row.name4 = name4
    row.name5 = name5
    row.name6 = name6
    row.name7 = name7
    row.name8 = name8
    row.name9 = name9
    row.name10 = name10
    return row.SerializeToString()


def insert_sensor_data(project_id: str, dataset_id: str, table_id: str, event):
    
    # サービスアカウントの認証
    CONFIDENTIAL_FILE = '同列に置いたサービスアカウントの名称.json'
    os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = CONFIDENTIAL_FILE

    # クライアント呼び出し
    storage_client = storage.Client()
    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)
    write_stream = types.WriteStream()

    write_stream.type_ = types.WriteStream.Type.COMMITTED
    write_stream = write_client.create_write_stream(
        parent=parent, write_stream=write_stream
    )
    stream_name = write_stream.name

    request_template = types.AppendRowsRequest()
    request_template.write_stream = stream_name

    # BigQueryに送りつける入れ物の作成
    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    gs_log_pb2.WrStoregaSample.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data

    append_rows_stream = writer.AppendRowsStream(write_client, request_template)

    # gcsの情報読み出し & データのダウンロード
    bucket = storage_client.bucket(event['bucket'])
    blob = bucket.blob(event['name'])
    blob.download_to_filename("/tmp/" + str(event['name']))

    try:
        proto_rows = types.ProtoRows()
	#ファイルがshiftjisだった
        with codecs.open("/tmp/" + str(event['name']), 'r', 'shift_jis') as f:
            log_list = f.readlines()
            for s_data in log_list:
                d_list = s_data.rstrip("\n").split(",")
		# データ先頭のtypeに挙動判定を行う
                if len(d_list)  < 12 or d_list[0] != "○○":
                    pass
                else:
                    # 取得したデータを配列に入れていく
                    proto_rows.serialized_rows.append(create_row_data(
                        d_list[0],
                        d_list[1],
                        d_list[2],
                        d_list[3],
                        d_list[4],
                        d_list[5],
                        d_list[6],
                        d_list[7],
                        d_list[8],
                        d_list[9],
                        d_list[10],
                        d_list[11],

                        ))

        # 送信できるデータが有れば送信する
        if len(proto_rows.serialized_rows) > 0:
            request = types.AppendRowsRequest()
            request.offset = 0
            proto_data = types.AppendRowsRequest.ProtoData()
            proto_data.rows = proto_rows
            request.proto_rows = proto_data
            append_rows_stream.send(request)
    
        append_rows_stream.close()

def main(event, context):
    insert_sensor_data('BQのプロジェクト名', 'データセット名', '事前に作成したテーブル名',event)

一応、requirementsも

google-cloud-bigquery-storage
google-cloud-storage

結果

1ファイルは数千行程度の形のファイルだが、start(gcs配置後、発火)から end(BQへの格納)まで数秒で問題なく終わっていることが確認できました

めでたし、めでたし
ぜひ皆様もお試しあれ

    この投稿をXにポストするこの投稿をFacebookにシェアするこのエントリーをはてなブックマークに追加