https://github.com/andreax79/airflow-code-editor
직접 마운트 된 airflow 디렉터리 ./dags 폴더에 넣어도 되지만 UI환경에서 바로 코드 수정을 할 수 있는 에디터가 존재
1. SQL을 이용한 DAG 구현
from datetime import timedelta
from airflow import DAG
import airflow.utils.dates
from urllib import request
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
# dag 정의
dag = DAG(
dag_id = "mssql_connection",
start_date = airflow.utils.dates.days_ago(1),
schedule_interval=None
)
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
# sql 입력
# mssql_create_sql = "CREATE TABLE airflow_test (a int)"
mssql_select_sql = "SELECT * FROM [dbo].[KT_DKT_INPUT_TRAIN]"
# mssql operator task 정의
create_table_mssql_task = MsSqlOperator(
task_id='mssql_select_sql',
mssql_conn_id='reco_mart',
database='AIDO_RECO_KT',
sql=mssql_select_sql,
params={"table":"airflow_test"},
dag=dag
)
start >> create_table_mssql_task >> end
2. Pythonoperator를 이용한 DAG 구현
from pathlib import Path
from airflow import DAG
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd
# DAG 정의
dag = DAG (
dag_id = "01_unscheduled",
start_date = dt.datetime(2022,4,18),
schedule_interval="@daily" # 매일 자정
)
fetch_events = BashOperator(
task_id="fetch_events",
bash_command=(
"mkdir -p /data/events && "
"curl -o /data/events.json http://events_api:5000/events"
),
dag=dag,
)
# 파이썬 함수 정의
def _calculate_stats(input_path,output_path):
"""
이벤트 통계 계산하기
"""
events = pd.read_json(input_path)
stats = events.groupby(["date","user"]).size().reset_index()
Path(output_path).parent.mkdir(exist_ok = True)
stats.to_csv(output_path,index=False)
# Task 정의 - Pythonoperator
calculate_stats = PythonOperator(
task_id="calculate_stats",
python_callable=_calculate_stats,
op_kwargs = {
"input_path":"data/events.json",
"output_path":"data/stats.csv",
},
dag=dag
)
fetch_events >> calculate_stats
3. ssh operator를 이용한 ssh 연결
from airflow.providers.ssh.hooks.ssh import SSHHook
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.models import Variable
# DAG 정의
dag = DAG(
dag_id="ssh_call",
description="sshcall",
start_date=airflow.utils.dates.days_ago(1),
schedule_interval=None,
)
# 원격 아이피 주소 입력
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP')
# ssh_hook 정의
ssh_hook = SSHHook(ssh_conn_id=None,remote_host=REMOTE_BIND_IP,username='user',password='passwd')
# ssh_opertor
ssh_opertor= SSHOperator(
ssh_hook = ssh_hook,
task_id = 'ssh_call',
command = 'python airflow_pipeline/ssh_test.py',
dag = dag
)
airflow api
- 외부에서 airflow 서버 내 dag 실행이 필요할 경우 사용
- airflow.cfg 파일에서 auth_backend = airflow.api.auth.backend.basic_auth 으로 수정 필요
- airflow webserver 서버 재시작 (sudo systemctl start airflow-webserver
- 호출 Url : http://서버아이피주소:8080/api/v1/dags/dagid(호출하려고 하는 dagid 입력) /dagRuns
'DevOps' 카테고리의 다른 글
[Airflow] SlackAPIPostOperator 설정 (0) | 2024.10.24 |
---|---|
[Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법 (0) | 2024.10.22 |
[Airflow] Airflow 설치 (Docker) (0) | 2024.10.22 |
Dockerfile을 이용한 코드에 의한 서버 구축 (0) | 2024.10.21 |
[Vector] Logstash 대체안 Vector (0) | 2024.04.19 |