· データ処理とワークフロー  · 3 min read

Cloud Dataflow を使ったストリームデータ処理の実装

Cloud Dataflow でリアルタイムデータ処理パイプラインを構築する方法を解説します

Cloud Dataflow でリアルタイムデータ処理パイプラインを構築する方法を解説します

Cloud Dataflow とは?

Cloud Dataflow は Google Cloud のフルマネージドなデータ処理サービスです。リアルタイムのストリームデータや大規模なバッチデータを処理できます。

Dataflow の概要

主な用途

  • IoT デバイスからのデータ処理
  • ログ分析
  • リアルタイム分析
  • ETL/ELT 処理

基本的なパイプライン作成

1. Apache Beam を使った実装

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# パイプラインの作成
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    
    # データの読み込みと処理
    (p 
     | 'ReadData' >> beam.io.ReadFromPubSub(topic='projects/my-project/topics/my-topic')
     | 'ParseJSON' >> beam.Map(json.loads)
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
         'my-project:dataset.table'))

2. データの変換処理

def process_data(element):
    return {
        'timestamp': element['timestamp'],
        'value': float(element['value']),
        'device_id': element['device_id']
    }

# 変換処理を追加
| 'ProcessData' >> beam.Map(process_data)

リアルタイム処理の実装

1. ウィンドウ処理

# 5分間の固定ウィンドウを作成
fixed_windows = beam.WindowInto(
    beam.window.FixedWindows(5 * 60))

# 集計処理
sum_values = (windowed_data
    | 'Sum' >> beam.CombinePerKey(sum))

2. 遅延データの処理

# 遅延データを許容するウィンドウ設定
window_config = beam.WindowInto(
    beam.window.FixedWindows(5 * 60),
    trigger=AfterWatermark(
        early=AfterCount(100),
        late=AfterCount(3)),
    allowed_lateness=60 * 5)

モニタリングと運用

1. パイプラインのモニタリング

  • スループット
  • 処理遅延
  • エラー率

Dataflow モニタリング

2. スケーリング設定

# ワーカー数の設定
pipeline_options = PipelineOptions(
    runner='DataflowRunner',
    project='my-project',
    job_name='streaming-job',
    num_workers=5,
    max_num_workers=10,
    autoscaling_algorithm='THROUGHPUT_BASED'
)

実践的なユースケース

1. IoT センサーデータの処理

# センサーデータの処理パイプライン
def process_sensor_data(element):
    return {
        'sensor_id': element['id'],
        'temperature': float(element['temp']),
        'humidity': float(element['humidity']),
        'timestamp': element['time']
    }

sensor_data = (p
    | 'ReadSensors' >> beam.io.ReadFromPubSub(topic=topic_path)
    | 'ParseSensor' >> beam.Map(process_sensor_data)
    | 'WriteSensor' >> beam.io.WriteToBigQuery(table_path))

2. ログ分析パイプライン

# エラーログの抽出と集計
def parse_log(element):
    data = json.loads(element)
    return (data['error_code'], 1)

error_counts = (logs
    | 'ParseLogs' >> beam.Map(parse_log)
    | 'CountErrors' >> beam.CombinePerKey(sum))

パフォーマンスチューニング

1. パーティション設定

# キーによるパーティション分割
| 'Partition' >> beam.Partition(
    lambda elem, num_partitions: hash(elem) % num_partitions,
    num_partitions=10)

2. Fusion の最適化

  • ParDo 操作の結合
  • 適切なバッチサイズの設定

より詳しい実装方法については、以下の書籍がお勧めです:

『Google Cloud Platform 実践 ビッグデータ分析基盤開発』 https://amzn.to/3XNIcw9

参考資料

Back to Blog

Related Posts