땅지원
땅지원's Personal blog
땅지원
전체 방문자
오늘
어제
  • 전체 (353)
    • Frontend (2)
      • React (2)
    • Backend (90)
      • Java (16)
      • Python (19)
      • Spring (23)
      • Database (21)
      • Troubleshooting (8)
    • DevOps (27)
      • ELK (13)
    • CS (40)
    • OS (2)
      • Linux (2)
    • Algorithm (95)
      • concept (18)
      • Algorithm Problem (77)
    • 인공지능 (25)
      • 인공지능 (12)
      • 연구노트 (13)
    • 수업정리 (35)
      • 임베디드 시스템 (10)
      • 데이터통신 (17)
      • Linux (8)
    • 한국정보통신학회 (5)
      • 학술대회 (4)
      • 논문지 (1)
    • 수상기록 (8)
      • 수상기록 (6)
      • 특허 (2)
    • 삼성 청년 SW 아카데미 (6)
    • 42seoul (12)
    • Toy project (3)
    • 땅's 낙서장 (2)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

  • 20.11.6 BB21플러스 온라인 학술대회
  • 20.10.30 한국정보통신학회 온라인 학술대회

인기 글

태그

  • I
  • E
  • 이것이 리눅스다 with Rocky Linux9
  • ㅗ
  • D

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
땅지원

땅지원's Personal blog

DevOps

[Airflow] DAG 생성

2024. 10. 25. 10:15

https://github.com/andreax79/airflow-code-editor

 

GitHub - andreax79/airflow-code-editor: A plugin for Apache Airflow that allows you to edit DAGs in browser

A plugin for Apache Airflow that allows you to edit DAGs in browser - andreax79/airflow-code-editor

github.com

직접 마운트 된 airflow 디렉터리 ./dags 폴더에 넣어도 되지만 UI환경에서 바로 코드 수정을 할 수 있는 에디터가 존재

 

1. SQL을 이용한 DAG 구현

from datetime import timedelta
from airflow import DAG
import airflow.utils.dates
from urllib import request
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

# dag 정의 
dag = DAG(
    dag_id = "mssql_connection",
    start_date = airflow.utils.dates.days_ago(1),
    schedule_interval=None
)

start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")

# sql 입력 
# mssql_create_sql = "CREATE TABLE airflow_test (a int)"
mssql_select_sql = "SELECT * FROM [dbo].[KT_DKT_INPUT_TRAIN]"

# mssql operator task 정의
create_table_mssql_task = MsSqlOperator(
            task_id='mssql_select_sql',
            mssql_conn_id='reco_mart',
            database='AIDO_RECO_KT',
            sql=mssql_select_sql,
            params={"table":"airflow_test"},
            dag=dag
        )

start >> create_table_mssql_task >> end

 

 

2. Pythonoperator를 이용한 DAG 구현

from pathlib import Path
from airflow import DAG
import datetime as dt
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
import pandas as pd

# DAG 정의
dag = DAG (
    dag_id = "01_unscheduled",
    start_date = dt.datetime(2022,4,18),
    schedule_interval="@daily" # 매일 자정 
)

fetch_events = BashOperator(
    task_id="fetch_events",
    bash_command=(
        "mkdir -p /data/events && "
        "curl -o /data/events.json http://events_api:5000/events"
    ),
    dag=dag,
)

# 파이썬 함수 정의 
def _calculate_stats(input_path,output_path):
    """
    이벤트 통계 계산하기
    """

    events = pd.read_json(input_path)
    stats = events.groupby(["date","user"]).size().reset_index()
    Path(output_path).parent.mkdir(exist_ok = True)
    stats.to_csv(output_path,index=False)

# Task 정의 - Pythonoperator 
calculate_stats = PythonOperator(
    task_id="calculate_stats",
    python_callable=_calculate_stats,
    op_kwargs = {
            "input_path":"data/events.json",
            "output_path":"data/stats.csv",
    },
    dag=dag
)

fetch_events >> calculate_stats

 

3. ssh operator를 이용한 ssh 연결

from airflow.providers.ssh.hooks.ssh import SSHHook
import airflow.utils.dates
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.models import Variable

# DAG 정의 
dag = DAG(
    dag_id="ssh_call",
    description="sshcall",
    start_date=airflow.utils.dates.days_ago(1),
    schedule_interval=None,
)

# 원격 아이피 주소 입력 
REMOTE_BIND_IP = Variable.get('SERVER_REMOTE_BIND_IP')


# ssh_hook 정의
ssh_hook = SSHHook(ssh_conn_id=None,remote_host=REMOTE_BIND_IP,username='user',password='passwd')

# ssh_opertor 
ssh_opertor= SSHOperator(
    ssh_hook = ssh_hook,
    task_id = 'ssh_call',
    command = 'python airflow_pipeline/ssh_test.py',
    dag = dag

)

 

 

airflow api 

- 외부에서 airflow 서버 내 dag 실행이 필요할 경우 사용

- airflow.cfg 파일에서 auth_backend = airflow.api.auth.backend.basic_auth 으로 수정 필요

- airflow webserver 서버 재시작 (sudo systemctl start airflow-webserver

- 호출 Url : http://서버아이피주소:8080/api/v1/dags/dagid(호출하려고 하는 dagid 입력) /dagRuns

'DevOps' 카테고리의 다른 글

[Airflow] SlackAPIPostOperator 설정  (0) 2024.10.24
[Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법  (0) 2024.10.22
[Airflow] Airflow 설치 (Docker)  (0) 2024.10.22
Dockerfile을 이용한 코드에 의한 서버 구축  (0) 2024.10.21
[Vector] Logstash 대체안 Vector  (0) 2024.04.19
    'DevOps' 카테고리의 다른 글
    • [Airflow] SlackAPIPostOperator 설정
    • [Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법
    • [Airflow] Airflow 설치 (Docker)
    • Dockerfile을 이용한 코드에 의한 서버 구축
    땅지원
    땅지원
    신입 개발자의 우당탕탕 기술 블로그

    티스토리툴바