땅지원
땅지원's Personal blog
땅지원
전체 방문자
오늘
어제
  • 전체 (353)
    • Frontend (2)
      • React (2)
    • Backend (90)
      • Java (16)
      • Python (19)
      • Spring (23)
      • Database (21)
      • Troubleshooting (8)
    • DevOps (27)
      • ELK (13)
    • CS (40)
    • OS (2)
      • Linux (2)
    • Algorithm (95)
      • concept (18)
      • Algorithm Problem (77)
    • 인공지능 (25)
      • 인공지능 (12)
      • 연구노트 (13)
    • 수업정리 (35)
      • 임베디드 시스템 (10)
      • 데이터통신 (17)
      • Linux (8)
    • 한국정보통신학회 (5)
      • 학술대회 (4)
      • 논문지 (1)
    • 수상기록 (8)
      • 수상기록 (6)
      • 특허 (2)
    • 삼성 청년 SW 아카데미 (6)
    • 42seoul (12)
    • Toy project (3)
    • 땅's 낙서장 (2)

블로그 메뉴

  • 홈
  • 태그
  • 방명록

공지사항

  • 20.11.6 BB21플러스 온라인 학술대회
  • 20.10.30 한국정보통신학회 온라인 학술대회

인기 글

태그

  • I
  • ㅗ
  • E
  • 이것이 리눅스다 with Rocky Linux9
  • D

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
땅지원

땅지원's Personal blog

DevOps

[Airflow] SlackAPIPostOperator 설정

2024. 10. 24. 13:59

이전 버전에서는 SlackAPIPostOperator 메소드 내에서 token 를 입력을 했어야 했지만 직접 token 을 입력해보았을 때, 에러가 발생하고 현재는 지원하지 않는 것 같다.

 

airflow.providers.slack.operators.slack — apache-airflow-providers-slack Documentation

 

airflow.apache.org

 

Post messages to a Slack channel.

slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    text="hello there!",
    channel="#random",
)



해결방법 : Slack API Connection을 만들어주고 BaseHook 사용

self.slack_token = BaseHook.get_connection('slack_api_default').password


slack_operator.py

from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
 
class SlackAlert:
    def __init__(self, channel):
        self.slack_channel = channel
        self.slack_token = BaseHook.get_connection('slack_api_default').password
 
    def slack_fail_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_failed',
            channel=self.slack_channel,
            text="""
                :red_circle: Task Failed.
                *Task*: {task} 
                *Dag*: {dag}
                *Execution Time*: {exec_date} 
                *Log Url*: {log_url}
                """.format(
                    task=context.get('task_instance').task_id,
                    dag=context.get('task_instance').dag_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url
                    )
                  )
        return alert.execute(context=context)


test_alert.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
 
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
 
from slack_operator import SlackAlert
 
alert = SlackAlert('#일반-test-모니터링')
 
# DAG 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 20),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'on_success_callback': alert.slack_fail_alert
}
 
# DAG 정의
dag = DAG(
    'Slack_test',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
 
# 첫 번째 Bash 작업
task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
 
# 세 번째 Bash 작업
task2 = BashOperator(
    task_id='print_hello',
    bash_command='echo "Hello World"',
    dag=dag,
)
 
# 작업 순서 정의
task1 >> task2

'DevOps' 카테고리의 다른 글

[Airflow] DAG 생성  (0) 2024.10.25
[Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법  (0) 2024.10.22
[Airflow] Airflow 설치 (Docker)  (0) 2024.10.22
Dockerfile을 이용한 코드에 의한 서버 구축  (0) 2024.10.21
[Vector] Logstash 대체안 Vector  (0) 2024.04.19
    'DevOps' 카테고리의 다른 글
    • [Airflow] DAG 생성
    • [Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법
    • [Airflow] Airflow 설치 (Docker)
    • Dockerfile을 이용한 코드에 의한 서버 구축
    땅지원
    땅지원
    신입 개발자의 우당탕탕 기술 블로그

    티스토리툴바