ちょっと直近使うこともあったので簡単にメモとなります。
下記の記事などを参考にさせて頂いています。
https://zenn.dev/aldagram_tech/articles/8e09cff2bd74a1
https://qiita.com/m77so/items/8b0d608489c602f91a87
前置き
Storage Write APIのいいところ
- 公式が推している(たしか)
- offsetを用いたデータの格納の観点で、冪等席を担保しやすい構成である
- gRPCを通して高速にデータの格納を行えるところ
- insertALL(streaming api)に対して従量課金が半値になるほど安い
- 詳細はこちら
Storage Write APIのうーんなところ
- 下記の記事で書かれているような部分でしょうか
- https://hack.nikkei.com/blog/advent20221221/#chapter5
ではさっそく
事前の要件
- 分単位でファイルが吐き出され、BigQueryに格納したい
- 吐き出されるファイルはGoogle Cloud Storageに配置される
- 単純に書き込む形式ではなく、少しETLを実施の上で書き込みたい(複数の可変となるヘッダー行をSKIPする程度)
- データはログに近しい形式であり、BQへの格納方法は追記(積み上げ)で問題ない
構成
- 絵を書くのが面倒なので文字だけで書くと下記のシンプル構成
- ① GCSにファイル配置される
- ② Cloud Functionsが発火され、ファイル読み込み
- ③ BQへの書き込みがStorage Write APIを通して行われる
① BigQuery上にテーブルを作成する
今回は転送したい情報が≒csvファイル形式だったので、それに合わせた形でテーブルを構築します
DROP table `sample_pj.work.wr_storega_sample`;
CREATE or replace TABLE
`sample_pj.work.wr_storega_sample`
(
type_name STRING NOT NULL,
create_date date NOT NULL,
name1 STRING,
name2 STRING,
name3 STRING,
name4 STRING,
name5 STRING,
name6 STRING,
name7 STRING,
name8 STRING,
name9 STRING,
name10 STRING,
)
PARTITION BY create_date
;
② Protocol Buffer定義を作成の上で、Pythonで扱うためのラッパーを作成する
- これは事前にローカル(ubuntu)で実施しました
- protocコマンドを叩くのですが、Pythonだとverが
20.3
以上じゃないとダメと言われ、updateかけるか、こちらから環境にあったものをダウンロードしてもらえればと思います - 私は、上記から最新をダウンロードし、面倒なので解凍された中のbin配下を直接叩きにいきました
syntax = "proto2";
message WrStoregaSample {
required string type_name = 1;
required string create_date = 2;
required string name1 = 3;
required string name2 = 4;
required string name3 = 5;
required string name4 = 6;
required string name5 = 7;
required string name6 = 8;
required string name7 = 9;
required string name8 = 10;
required string name9 = 11;
required string name10 = 12;
}
上記の定義に対して、protocを叩いてPythonファイルを作成します
protoc --python_out=. gs_log.proto
gs_log_pb2.py
が作成されたらOKです。こちらをmain側でimportしていきます。
③ローカルで疎通確認後、Functionsへ
- ローカルで事前に疎通確認(ローカルに配置されたファイルがBQへ問題なく書き込みされることの確認)まで行いましたが、下記コードとほぼ同じなので割愛します
- Functionsは第一世代を使用しました
- トリガーはもちろんgcsを設定し、事前にgcs側にバケットを作成しておきました
- 割り当てメモリは
512MB
、Pythonは3.7
を利用しました- 意図としては、Pythonのversionが
3.11
やメモリを最小にした場合だと、デプロイ時にエラーが出てまい、StackOverflowなどを見ると上記が解決作と出ていたため、それで対応した形です。とりあえず動けばOKな検証なので、詳細は追及していませんmm
- 意図としては、Pythonのversionが
Functionsのフォルダ構成(インラインエディタ)としては下記
├─ main.py
├─ requirements.txt
├─ サービスアカウントのJsonファイル(検証なのでキーファイルをそのまま配置)
└─ gs_log_pb2.py(先ほど作ったものを)
ちなみにgcsに配置されるデータイメージは
ヘッダー行1
~
(可変の)ヘッダ行N番目(最終)
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
~
データのtype(見だし),%Y%m%d,データ中身1,データ中身1,~,データ中身10
という感じ。前置きが長くなったが、これを読みだすFunctionsの本チャンコードはこちら
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
from google.cloud import storage
import datetime
import os
import codecs
# Protocol Buffer
import gs_log_pb2
# 事前に定義したテーブルに合わせたデータの作成
def create_row_data(type_name,create_date,name1,name2,name3,name4,name5,name6,name7,name8,name9,name10):
row = gs_log_pb2.WrStoregaSample()
row.type_name = type_name
# 時刻が%Y%m%dの形式で入ってくるのでDataに合わせる
row.create_date = create_date[0:4] + "-" + create_date[4:6] + "-" + create_date[6:]
row.name1 = name1
row.name2 = name2
row.name3 = name3
row.name4 = name4
row.name5 = name5
row.name6 = name6
row.name7 = name7
row.name8 = name8
row.name9 = name9
row.name10 = name10
return row.SerializeToString()
def insert_sensor_data(project_id: str, dataset_id: str, table_id: str, event):
# サービスアカウントの認証
CONFIDENTIAL_FILE = '同列に置いたサービスアカウントの名称.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = CONFIDENTIAL_FILE
# クライアント呼び出し
storage_client = storage.Client()
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
write_stream = types.WriteStream()
write_stream.type_ = types.WriteStream.Type.COMMITTED
write_stream = write_client.create_write_stream(
parent=parent, write_stream=write_stream
)
stream_name = write_stream.name
request_template = types.AppendRowsRequest()
request_template.write_stream = stream_name
# BigQueryに送りつける入れ物の作成
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
gs_log_pb2.WrStoregaSample.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# gcsの情報読み出し & データのダウンロード
bucket = storage_client.bucket(event['bucket'])
blob = bucket.blob(event['name'])
blob.download_to_filename("/tmp/" + str(event['name']))
try:
proto_rows = types.ProtoRows()
#ファイルがshiftjisだった
with codecs.open("/tmp/" + str(event['name']), 'r', 'shift_jis') as f:
log_list = f.readlines()
for s_data in log_list:
d_list = s_data.rstrip("\n").split(",")
# データ先頭のtypeに挙動判定を行う
if len(d_list) < 12 or d_list[0] != "○○":
pass
else:
# 取得したデータを配列に入れていく
proto_rows.serialized_rows.append(create_row_data(
d_list[0],
d_list[1],
d_list[2],
d_list[3],
d_list[4],
d_list[5],
d_list[6],
d_list[7],
d_list[8],
d_list[9],
d_list[10],
d_list[11],
))
# 送信できるデータが有れば送信する
if len(proto_rows.serialized_rows) > 0:
request = types.AppendRowsRequest()
request.offset = 0
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
append_rows_stream.close()
def main(event, context):
insert_sensor_data('BQのプロジェクト名', 'データセット名', '事前に作成したテーブル名',event)
一応、requirementsも
google-cloud-bigquery-storage
google-cloud-storage
結果
1ファイルは数千行程度の形のファイルだが、start(gcs配置後、発火)から end(BQへの格納)まで数秒で問題なく終わっていることが確認できました
めでたし、めでたし
ぜひ皆様もお試しあれ