Upgrading Managed Airflow (Cloud Composer) on GCP

สวัสดีค่ะ ล่าสุดที่บอเราเพิ่งมีการอัพเกรดตัว Managed Airflow หรือ Cloud Composer (ชื่อเก่า ที่เพิ่งเปลี่ยนชื่อมาหมาดๆ) เราเลยถือโอกาสเขียนเป็นโน้ตให้ตัวเอง และเผื่อแชร์ให้หลายท่านที่กำลังอัพเกรดด้วย

ตัว Cloud composer มีข้อดีที่เวลาอัพจะอัพค่อนข้างง่าย และสามารถกดอัพผ่าน UI ได้เลย (ถ้ามั่นใจ) แต่ข้อเสียก็นั่นล่ะ บังคับให้เราต้องอัพบ่อย ต้องอัพทุกๆปี ต้องจัดสรร workload ให้ทันไม่งั้นตกขบวน

สังเกต release date กับ support end date — อายุของ image 1 ปี ต้องขยันนิดนึง555


สามารถเช็คอายุขัย images ได้ อย่าลืมดูว่าใช้ Cloud composer gen อะไรด้วย
>> ตารางเช็ค Images

และแน่นอนว่าถ้ามันง่ายขนาดนั้น ก็คงไม่เกิดเป็นบทความนี้ ฮ่าๆ

จะเล่าความมันส์ให้ฟัง

ความ Challenge

เนื่องจากทีมเรามี development environment หลักๆคือ non-prod และ prod

สำหรับ data engineer แล้ว ตัว non-prod ก็ไม่ได้ถูกใช้ตลอดเวลา เพราะหลัง develop เสร็จ data ทุกอย่างก็ deploy ขึ้น prod อยู่แล้ว

ดังนั้นสำหรับการเปิดเครื่อง Cloud composer non-prod ตลอดเวลาทั้งจึงเปลือง cost มากและเราไม่ได้ใช้รันตลอดเวลา เราจึงใช้การเปิด-ปิดเครื่อง non-prod ในช่วง work hour หรือเวลาต้องใช้เท่านั้น

การเปิด-ปิดเครื่องเราใช้ terraform เป็นตัวช่วย เวลาปิดเครื่องก็แค่ ทำ save snapshot -> ปิดเครื่อง พอถึงเวลาใช้ เปิดเครื่อง -> load snapshot

แต่ปัญหาที่เจอคือ หาก image ที่ใช้อยู่หมดอายุละก็… หายนะ ! เพราะมันจะหา image นั้นไม่เจอ และบังคับอัพเกรดทันที ดังนั้นเครื่องที่ non-prod เราจึงโดนบังคับอัพเกรดใหม่เสมอ5555 แต่ถึงกระนั้นก็ยังโอเคเพราะยังไงเราก็ไม่เปิด dags ตลอดเวลาอยู่แล้ว

คราวนี้น้อง prod ที่เปิด 24/7 น้องไม่ได้รับผลกระทบอะไรเนื่องจากไม่ได้ถูกปิดเลย แต่ก็กลายเป็นคนล้าหลัง ถูกเพื่อนทิ้งไปโดยปริยาย

Challenge ของเราคือ non-prod ไปแล้วนะ แต่ prod ต้องรีบตามให้ทัน ทำให้ทีมต้องเช็คโค้ดดีๆ

เอาล่ะ มาอัพเกรดกัน

ปกติแล้ว ถ้าหากเราจะอัพเกรดเครื่อง Managed Airflow จริงๆเราสามารถกดอัพเกรดเครื่องเดิมได้เลย แต่สำหรับเคสนี้จะมีความเสี่ยงที่ dags จะแตกเนื่องจาก version ค่อนข้างห่าง

ตัวอย่าง กรณีหากอยากอัพเกรดเครื่องปัจจุบัน สามารถทำผ่าน UI และเช็ค conflict ได้เลย

และอีกอย่างคือ ถ้าอัพเกรดเครื่องเดิมเลย เราก็จะทำการ roll back ไม่ได้ เนื่องจากไม่มี image ให้ดึงแล้ว

เราเลยเลือกใช้ blue/green deployment นั่นคือการที่เราเปิดเครื่อง Managed Airflow เครื่องใหม่ที่เป็น version ใหม่ ควบคู่กับตัวเก่า(ที่ปิด dag แล้ว) เผื่อในกรณีที่หากตัวใหม่ไม่ work เราจะยังสามารถกลับไปใช้ตัวเก่า แล้วเปิด dag นั้นๆได้

blue/green deployment

Step จึงเป็นดังนี้

  1. เตรียม Terraform files ที่มี version ใหม่
  2. ปิด dags ทั้งหมดใน Airflow ตัวเก่า (blue)
  3. Save Snapshot ตัว Managed Airflow ตัวเก่าไปที่ Google Cloud Storage (~10–20 mins)
  4. สร้างเครื่อง Managed Airflow version ใหม่ (green)
  5. Load snapshot ตัวเดิมไปที่ Managed Airflow ตัวใหม่
  6. เมิจ dags code ชุดใหม่ที่ compatible กับ Managed Airflow version ใหม่
  7. ** เช็คว่าทุกอย่างโอเคไหม ** พวก connections, error ต่างๆ
  8. เปิด dags บางส่วน เช่น dags ที่สามารถใช้เทสเพื่อดูว่าสามารถทำงานได้ไหม เช่น Airflow สามารถต่อไปที่ Source DB, BigQuery, หรือ reverse ETL กลับไปที่ Source DB ได้เช่นกัน รวมถึง เทสให้มี failed dags ด้วย เพื่อทดสอบว่าระบบ notification ยังใช้งานได้เหมือนเดิม หากไม่มีปัญหา ก็เปิดจนครบคล้ายตัวเดิม
  9. ถ้าดูแล้วทุกอย่างโอเค วันถัดมาค่อยปิดเครื่องได้ เย้

โดย step ที่ 7 กับ 8 จะเป็นตัวกำหนดว่าเราจะ go หรือ no-go ถ้ามีปัญหาที่เครื่องใหม่ปุ๊บ สามารถถอยได้ทันทีด้วยการปิดตัวใหม่ และเปิด dags ที่ Managed Airflow ตัวเก่า

และแน่นอนว่าถ้าหากมันง่ายขนาดนั้น คงไม่ได้เขียนบทความนี้ ฮ่าๆ

What surprised us

1. ขั้นตอน Load snapshot failed เพราะ airflow configuration overrides มันตีกัน

วิธีแก้: load snapshot ด้วย option skip setting ต่างๆ

ซึ่งเราสามารถเลือกได้ว่าจะ skip อะไรบ้าง โดยเราเลือกดังนี้

  • skip PyPi packages installation
  • skip setting environement variables
  • skip setting Airflow configuration overrides

**เหตุผลที่เลือก skip ได้เพราะเรา define config พวกนี้ใน Terraform อยู่แล้ว จึงไม่มีปัญหา

จริงๆสามารถใช้ผ่าน CLI ได้ด้วย

gcloud composer environments snapshots load \
DESTINATION_ENVIRONMENT_NAME \
--location LOCATION \
--snapshot-path "SNAPSHOT_PATH" \
--skip-airflow-overrides-setting \
--skip-environment-variables-setting \
--skip-pypi-packages-installation \
--skip-gcs-data-copying

2. หา Module บางอย่างไม่เจอ

No module named ‘airflow.providers.postgres.operators’
แม้ว่าใน default image จะมีอยู่แล้ว

วิธีแก้: เราสามารถ override PyPi package ได้

เคสนี้ไม่ surprise เท่าไร เนื่องจากมีตอนอัพเกรดที่ dev, sit, uat เจอแล้วรอบนึง

before over Python package
After override Python package

3. ลืมเพิ่ม Firewall เพื่อให้ Airflow เข้าถึง DB ได้ !

เนื่องจากการอัพเกรดครั้งนี้ เรามีย้าย Managed Airflow ไปวง VPC ตัวใหม่ด้วย คือถึงแม้ว่า VPC วงใหม่จะอยู่วงเดียวกับ destination DB แล้ว แต่ลืมไปว่าต้องแอด IP เพิ่มใน Firewall ด้วย

วิธีแก้: อย่าลืมเช็คเรื่อง network ดีๆ

มีวิธีเช็คไวๆด้วยการ drop dag file ที่เขียนขึ้นมาลงใน GCS เพื่อเช็ค connectivity ที่ต่อไปหา destination โดยเฉพาะ

Example dag file concept:

"""
SFTP connectivity test DAG - tests all reverse ETL SFTP servers
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import logging
default_args = {
'owner': 'airflow',
'retries': 0,
'start_date': datetime(2026, 5, 6),
}
dag = DAG(
'test_sftp_connectivity',
default_args=default_args,
description='Test SFTP connectivity to all reverse ETL servers',
schedule_interval=None,
catchup=False,
tags=['testing'],
)
# SFTP configurations to test
SFTP_CONFIGS = {
"DESTINATION_1": {
<< sftp connection detail >>
...
},
}
def test_sftp_connection(sftp_name, sftp_config):
"""Test SFTP connection to a specific server"""
try:
...
<<<< Python code to check connectivity here >>>
...
logging.info(f"✅ {sftp_name} SFTP connection successful!")
except Exception as e:
logging.error(f"❌ {sftp_name} SFTP failed: {e}")
raise
# Create a task for each SFTP server
for sftp_name, sftp_config in SFTP_CONFIGS.items():
task = PythonOperator(
task_id=f"test_{sftp_name.lower()}_sftp",
python_callable=test_sftp_connection,
op_kwargs={"sftp_name": sftp_name, "sftp_config": sftp_config},
dag=dag,
)

Special thanks

ทั้งนี้การอัพเกรดครั้งนี้ต้องขอบคุณน้องเพียว devops ที่ช่วยเตรียม terraform files ให้และปรับ CI/CD ให้เป็น Airflow ตัวใหม่ 🙏🏻

Comment and Feedback

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.