ลองสร้าง SFTP server ให้ Airflow ไปวางไฟล์ให้หน่อย

บทความนี้เราจะจำลองสร้าง SFTP server กัน โดย use case ของเราคือเราอยากดึงข้อมูลจาก BigQuery ไปวางไว้ที่ Google Cloud Storage แล้วต่อไปยัง SFTP server แบบ automation ทั้งหมด โดยควบคุมผ่านเจ้าตัว Airflow นั่นเอง

SFTP คืออะไร?

SFTP ย่อมาจาก Secure File Transfer Protocol คือ protocol หนึ่งใช้สำหรับการ transfer ไฟล์โดยผ่านช่องทาง File Transfer Protocol (FTP) โดยผ่าน SSH protocol ซึ่งเป็นช่องทางที่ secure ในการส่งข้ามข้อมูล

Set up SFTP server on Compute Engine

ซึ่งก่อนอื่นเราต้องมี SFTP ปลายทางก่อน ดังนั้นเราจะจำลองสร้าง server ขึ้นมาจาก Compute Engine

ไปที่ Compute Engine > VM instances แล้วกด create instance ขึ้นมา

จากนั้นเราก็ตั้งชื่อ server ของเรา แล้วเซ็ตไซส์เครื่องที่ E2-small

เนื่องจากเราตั้ง server นี้เฉพาะกิจ เลยไม่ได้ต้องการเครื่องที่ใหญ่อะไรมากมาย เอาเครื่องเล็กๆเบาๆ

ตรง Boot Disk ด้านล่างให้กดเปลี่ยน OS เป็น CentOS ตาม config ดังรูป

จากนั้น รอสักพักนึงเพื่อให้ server เราสร้างเสร็จ

เมื่อสร้างเสร็จเราจะเห็นเครื่องหมายติ๊กถูกด้านหน้า และ External IP ของ server

จากนั้นให้เรากดปุ่ม SSH เพื่อ Secure Shell เข้าสู่ server แบบง่ายๆผ่าน browser

คราวนี้เราจะสร้าง user เพื่อเข้าใช้งานแบบเป็น Superuser โดยใช้คำสั่ง useradd

$ sudo useradd <username>$ sudo passwd <username>

example: อย่าก็อปเพลิน นะจ๊ะ

[burasakorn_s@mils-sftp-server /]$ sudo useradd mils[burasakorn_s@mils-sftp-server /]$ sudo passwd milsChanging password for user mils.New password: Retype new password: passwd: all authentication tokens updated successfully.

คราวนี้เราเข้าไปแก้ config ไฟล์สักหน่อย ผ่านตัว vim

[burasakorn_s@mils-sftp-server /]$ sudo vim /etc/ssh/sshd_config

เข้าไปโดยใช้คำสั่ง i เพื่อแก้ไฟล์ sshd_config เพื่อบอกว่าในการที่จะ authen ผ่านเครื่อง จะเป็นการ authen แบบใช้ user/password เท่านั้น เพราะหากเซ็ต PasswordAuthentication เป็น no นั้น จะต้อง authen ผ่าน public key

# To disable tunneled clear text passwords, change to no here!PasswordAuthentication yes#PermitEmptyPasswords no#PasswordAuthentication no

คราวนี้เมื่อเราแก้ไขเสร็จแล้ว ให้กด :wq เพื่อเซฟแล้วออกมา

จากนั้นเราก็ restart sshd แล้วเช็ค status สักหน่อย

[burasakorn_s@test-for-sftp ~]$ sudo systemctl restart sshd[burasakorn_s@test-for-sftp ~]$ sudo systemctl status sshd

Test Connection

คราวนี้เราจะทดสอบว่าเราสามารถ access ผ่านเข้า SFTP server ของเราได้ไหม

มี 2 วิธี จะใช้ CLI หรือโปรแกรมเสริมก็ได้

  1. CLI
$ sftp <username>@<ip_or_hostname>

2.ใช้โปรแกรมเสริม อันนี้เราใช้ Filezilla

โดยตั้งค่าตามรูปด้านล่าง

** SFTP เป็น port 22

ผลที่ได้ จะเห็นได้ว่าเราสามารถ connect เข้า SFTP server เราได้แล้วที่ path /home/mils

หาก connect ไม่ได้ ให้ลองเช็คเรื่อง firewall ในการ allow port 22 ว่าเข้าสู่วง VPC ปลายทางของ GCP project นี้ไหม


Data Source

คราวนี้สิ่งที่เราอยากทำคือ เราอยาก export BigQuery view ออกมาแบบ automate

หน้าตาของ view

Airflow side

step นี้เราจะไม่สอนเซ็ตอัพ Airflow นะ ขออนุญาติข้ามไปเลย (หรือรอจนกว่าเราจะเขียน555)

ทีนี้หากมี Airflow แล้ว เราจะสร้าง DAG หน้าตาประมาณนี้

DAG file

import pathlib
from datetime import datetime
import pendulum
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_sftp import \
GCSToSFTPOperator
from airflow.utils.task_group import TaskGroup
tz = pendulum.timezone("Asia/Bangkok")
default_args = {
"owner": "burasakorn.sab@cjexpress.co.th",
"depends_on_past": False,
"retries": 0,
}
with DAG(
dag_id="copy_data_from_bq_to_sftp",
default_args=default_args,
schedule_interval="0 9 * * *",
start_date=datetime(2022, 9, 2, tzinfo=tz),
catchup=False,
) as dag:
start_pipeline = DummyOperator(task_id="start_pipeline")
end_pipeline = DummyOperator(task_id="end_pipeline")
prepare_file_complete = DummyOperator(
task_id="prepare_file_completed", trigger_rule="all_done"
)
table_date = "{{ data_interval_start.format('YYYY-MM-DD') }}" # current date -1
bucket_name = "<bucket_name>"
project_name = "<project_id>"
dataset_name = "<dataset>"
bq_table_name = "<bq_view_name>"
query_bq_to_gcs = BigQueryInsertJobOperator(
task_id=f"customer_event_view_to_gcs",
location="asia-southeast1",
configuration={
"query": {
"query": f"""
EXPORT DATA OPTIONS (
uri = 'gs://{bucket_name}/serving/customer_event_{table_date}_*.csv',
format = 'CSV',
overwrite = true,
header = true,
field_delimiter = ';'
) AS
SELECT * FROM `{project_name}.{dataset_name}.{bq_table_name}` order by 1;
""",
"useLegacySql": False,
}
},
)
copy_dir_from_gcs_to_sftp = GCSToSFTPOperator(
task_id="dir-copy-gcs-to-sftp",
sftp_conn_id="sftp-conn",
source_bucket=f"{bucket_name}",
source_object=f"serving/*.csv",
destination_path="/home/mils", ## path in server
keep_directory_structure=False, ## False for copy file only
)
start_pipeline >> query_bq_to_gcs >> prepare_file_complete >> copy_dir_from_gcs_to_sftp >> end_pipeline

เริ่มต้นจากเราใช้ query ดึงจาก BigQuery view โดยที่ task นี้สามารถ export ส่งข้อมูลไปที่ Cloud storage ได้เลย ซึ่งเราจะใช้เป็น BigQueryInsertJobOperator ซึ่งภายในจะเป็น SQL ในการ export view ได้

ซึ่งปกติหากเราใช้ BigQueryToGCSOperator ธรรมดาจะทำไม่ได้ เนื่องจากติดเรื่อง เราไม่สามารถ export ข้อมูลจาก view ได้ ต้องขอบคุณพี่วัน Siriwan ที่แนะนำท่าดีๆแบบนี้

คราวนี้ให้เข้าไปสร้าง connection โดยมี extra เป็น json ว่า “no_host_key_check” เป็น true เพื่อที่ว่าเราจะได้ไม่ต้องใช้ host keys เก็บอยู่ใน known_hosts ปลายทาง

เสร็จแล้วเมื่อเซ็ตทุกอย่างเสร็จ และกดรัน DAG

ผลลัพท์ มีไฟล์ขึ้นแล้วเย้

สรุป

บทความนี้เราสอนเซ็ตอัพแบบละเอียดในการเซ็ต SFTP เป็นหลักเพื่อเป็นการลองเล่น หากใครทำตาม อย่าลืม terminate เครื่องด้วย ไม่งั้นเสียตัง 555

ส่วน Airflow ก็เป็นคนคอยควบคุมในการ export BigQuery view มาที่ GCS แล้วค่อยต่อไปยัง SFTP ซึ่งทั้งขั้นตอนในเรื่อง security นั้นยังไม่รวมถึงการทำ whitelist IP ว่า Airflow ควรจะเป็นถูก allowed มาเข้าใช้ใน SFTP server เพียงแต่เป็น practice ให้ลองทำตามและทำความเข้าใจการคุยกันของ service ต่างๆ

Ref:

Comment and Feedback

Leave a comment

This site uses Akismet to reduce spam. Learn how your comment data is processed.