[Airflow] ย้ายไฟล์จาก GCS ไป BigQuery ด้วย GoogleCloudStorageToBigQueryOperator กัน !

สวัสดีค่ะทุกคน วันนี้เราจะมาเล่น Operator ตัวนึงของ Airflow กัน !
โดยวันนี้เราจะนำเสนอเจ้าตัว GoogleCloudStorageToBigQueryOperator (ชื่อยาวมาก)

ด้วยเหตุที่ว่า เรามีโจทย์ในการดึง data เข้า BigQuery ค่ะ โดยที่เราจะ write file ไว้บนเครื่อง local ก่อนและจะนำเข้า BigQuery

อันที่จริงเราก็จะเขียน CSV file แล้ว upload ผ่าน bq command โดยตรงด้วย

bq load

ก็ได้

แต่เจ้ากรรมนายเวร BigQuery ที่รักเขาติด limit รับไฟล์ได้ไม่เกิน 10MB ค่ะ

ดังนั้นเราก็จะมีตัวช่วยอีกตัวมาคั่นกลาง คือ Google Cloud Storage นี่เอง โดยเราจะให้ GCS เป็นตัวพักไฟล์ เพราะเขาจะช่วยทำให้ไม่จำกัดขนาดของไฟล์ที่เราเขียนในการจะย้ายเข้า BigQuery ค่ะ

สรุป Pipeline ของเราก็จะเป็นประมาณนี้

Write csv file > Send to GCS > Send GCS file to BigQuery

โดยการ upload file ขึ้น GCS เราใช้วิธีเขียน script เอา (วันนี้นางไม่ใช่พระเอก ขอข้ามไปก่อน แฮะๆ)

def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
print(
"File {} uploaded to {}.".format(
source_file_name, destination_blob_name
)
)

ต่อไปจะเป็นส่วน Airflow โดยที่พระเอกวันนี้ของเราคือ GoogleCloudStorageToBigQueryOperator

โดยเราจะเขียนเป็น

from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id=’gcs_to_bq’,
bucket=’mils-bucket’, #ชื่อ bucket
source_objects=[
f’demo/data/ga_event_20200123.csv’], #ชื่อ ไฟล์บน GCS
destination_project_dataset_table=f’mils_bq.ga_event_20200123', #ชื่อ <Bigquery dataset>.<ชื่อ table>
source_format=’CSV’,
schema_fields=schema_ga_event,
create_disposition=’CREATE_IF_NEEDED’,
write_disposition=’WRITE_TRUNCATE’,
bigquery_conn_id=’my_gcp_connection’,
autodetect=False,
dag=dag
)

Note: ชื่อ table บน BigQuery เป็น format `%Y%m%d` นะคะ เช่น 20200123 เพื่อให้มันควบรวม table ไว้ กรณีเรา schedule ให้มันรันทุกวัน

คราวนี้ตัวน่าปวดหัวมาแล้ว
เราจำเป็นจะต้อง ใส่ schema type (ชื่อ column) ให้ BigQuery ด้วยค่ะ (ตรง attribute: schema_fields)
มันจะต้องอยู่ในรูปแบบนี้

[
{
"mode": "NULLABLE",
"name": "sessions",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "users",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "pageviews",
"type": "STRING"
},
...
]

ซึ่งมันยาวมากกกกกกก ยิ่งถ้าเรามี column เยอะๆจะทำให้ไฟล์ DAG เรายาวมาก
 และด้วยความขี้เกียจของเราบวกกับศักดิ์ศรีโปรแกรมเมอร์ของเราแล้ว เราจะไม่เขียนเองค่ะ ! เสียเวลา

เราก็จะใช้ lib ที่ชื่อว่า bigquery-schema-generator เป็นคนสร้าง

โดยเราจะสร้าง schema file เป็น JSON จากไฟล์ csv ที่เรามี data ค่ะ ด้วยคำสั่ง

generate-schema --input_format csv < ga_event_20210123.csv > schema_data/file_schema.json
## generate-schema --input_format csv < {ชื่อไฟล์} > {ชื่อไฟล์ JSON}

ที่นี่เราก็จะประกาศไว้ข้างบน ทำให้ task ให้ไฟล์เป็นแบบนี้

### เพิ่มตรงนี้มา อย่าลืม import json นะ
with open(f'/usr/local/airflow/scripts/data/schema_data/file_schema.json') as json_file:
schema_ga_event = json.load(json_file)
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='mils-bucket',
source_objects=[
f'demo/data/ga_event_{yesterday}.csv'],
destination_project_dataset_table=f'mils_bq.ga_event_{yesterday}',
source_format='CSV',
schema_fields=schema_ga_event,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='my_gcp_connection',
autodetect=False,
dag=dag
)
```

และนี่คือไฟล์ DAGแบบเต็มๆ

from airflow.utils.dates import days_ago
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.operators.bash_operator import BashOperator
import json
from airflow import DAG
from datetime import timedelta, date, datetime
import pendulum
local_tz = pendulum.timezone("Asia/Bangkok")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'get_ga_data_from_api_to_bq',
default_args=default_args,
description='Get Google Analytics data as CSV file and move to GCS then to BQ',
schedule_interval='0 4 * * *',
start_date=datetime(2021, 1, 18, tzinfo=local_tz),
tags=['example'],
)
today = date.today()
yesterday = (today – timedelta(days=1)).strftime("%Y%m%d")
command_1 = "python /usr/local/airflow/scripts/query_predictive.py -table ga_event -metric METRICS_1 -dimension DIMENSIONS_1"
t1 = BashOperator(
task_id='get_ga_data',
bash_command=command_1,
retries=1,
dag=dag
)
with open(f'/usr/local/airflow/scripts/data/schema_data/file_schema.json') as json_file:
schema_ga_event = json.load(json_file)
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='mils-bucket',
source_objects=[
f'demo/data/ga_event_{yesterday}.csv'],
destination_project_dataset_table=f'mils_bq.ga_event_{yesterday}',
source_format='CSV',
schema_fields=schema_ga_event,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='my_gcp_connection',
autodetect=False,
dag=dag
)
t1 >> GCS_to_BQ

ที่นี้ค่ะ มาถึงจุดสำคัญอีกสุดคือเราจะติดต่อให้น้อง Airflow ของเรารู้จัก Bigquery ด้วยการสร้าง Connections ค่ะ
ให้เรากดไปที่ Admin > Connections 
แล้วกดปุ่ม + สร้าง connections ตัวใหม่ขึ้นมา

เราจะตั้งชื่อเขาว่า my_gcp_connection
โดยจะมี setting ตามนี้

Conn id: my_gcp_connection
Conn Type: Google Cloud Platform
Project id: <ชื่อ project id >
Keyfile Path: <path ไป json key>
Keyfile JSON: <copy ของที่อยู่ในไฟล์ jsonมาแปะ>
Scopes(comma separated): https://www.googleapis.com/auth/cloud-platform
Number of Retries: 1
ตัวอย่างในรูป

ที่นี้เราก็ไปกดรัน DAG ดู และสามารถนำ table ขึ้น BigQuery ได้แล้วเย้ๆๆๆ

คราวนี้เราก็จะเห็น table เข้า BigQuery อย่างสวยงาม ~

ศึกษาเจ้าตัวนี้เพิ่มเติมได้ที่ Airflow Doc เลยนะคะ

ขอบคุณที่อ่านถึงตรงนี้และ Happy Coding ค่ะทุกคน !

ก่อนจาก ฝากน้องเห็ด Airflow ไว้เป็นกำลังใจให้ทุกท่าน

Comment and Feedback

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.