· データ処理とワークフロー · 3 min read
Cloud Dataflow を使ったストリームデータ処理の実装
Cloud Dataflow でリアルタイムデータ処理パイプラインを構築する方法を解説します

Cloud Dataflow とは?
Cloud Dataflow は Google Cloud のフルマネージドなデータ処理サービスです。リアルタイムのストリームデータや大規模なバッチデータを処理できます。
主な用途
- IoT デバイスからのデータ処理
- ログ分析
- リアルタイム分析
- ETL/ELT 処理
基本的なパイプライン作成
1. Apache Beam を使った実装
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# パイプラインの作成
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
# データの読み込みと処理
(p
| 'ReadData' >> beam.io.ReadFromPubSub(topic='projects/my-project/topics/my-topic')
| 'ParseJSON' >> beam.Map(json.loads)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'my-project:dataset.table'))
2. データの変換処理
def process_data(element):
return {
'timestamp': element['timestamp'],
'value': float(element['value']),
'device_id': element['device_id']
}
# 変換処理を追加
| 'ProcessData' >> beam.Map(process_data)
リアルタイム処理の実装
1. ウィンドウ処理
# 5分間の固定ウィンドウを作成
fixed_windows = beam.WindowInto(
beam.window.FixedWindows(5 * 60))
# 集計処理
sum_values = (windowed_data
| 'Sum' >> beam.CombinePerKey(sum))
2. 遅延データの処理
# 遅延データを許容するウィンドウ設定
window_config = beam.WindowInto(
beam.window.FixedWindows(5 * 60),
trigger=AfterWatermark(
early=AfterCount(100),
late=AfterCount(3)),
allowed_lateness=60 * 5)
モニタリングと運用
1. パイプラインのモニタリング
- スループット
- 処理遅延
- エラー率
2. スケーリング設定
# ワーカー数の設定
pipeline_options = PipelineOptions(
runner='DataflowRunner',
project='my-project',
job_name='streaming-job',
num_workers=5,
max_num_workers=10,
autoscaling_algorithm='THROUGHPUT_BASED'
)
実践的なユースケース
1. IoT センサーデータの処理
# センサーデータの処理パイプライン
def process_sensor_data(element):
return {
'sensor_id': element['id'],
'temperature': float(element['temp']),
'humidity': float(element['humidity']),
'timestamp': element['time']
}
sensor_data = (p
| 'ReadSensors' >> beam.io.ReadFromPubSub(topic=topic_path)
| 'ParseSensor' >> beam.Map(process_sensor_data)
| 'WriteSensor' >> beam.io.WriteToBigQuery(table_path))
2. ログ分析パイプライン
# エラーログの抽出と集計
def parse_log(element):
data = json.loads(element)
return (data['error_code'], 1)
error_counts = (logs
| 'ParseLogs' >> beam.Map(parse_log)
| 'CountErrors' >> beam.CombinePerKey(sum))
パフォーマンスチューニング
1. パーティション設定
# キーによるパーティション分割
| 'Partition' >> beam.Partition(
lambda elem, num_partitions: hash(elem) % num_partitions,
num_partitions=10)
2. Fusion の最適化
- ParDo 操作の結合
- 適切なバッチサイズの設定
より詳しい実装方法については、以下の書籍がお勧めです:
『Google Cloud Platform 実践 ビッグデータ分析基盤開発』 https://amzn.to/3XNIcw9