DevOps

[Airflow] SlackAPIPostOperator 설정

땅지원 2024. 10. 24. 13:59

이전 버전에서는 SlackAPIPostOperator 메소드 내에서 token 를 입력을 했어야 했지만 직접 token 을 입력해보았을 때, 에러가 발생하고 현재는 지원하지 않는 것 같다.

 

airflow.providers.slack.operators.slack — apache-airflow-providers-slack Documentation

 

airflow.apache.org

 

Post messages to a Slack channel.

slack = SlackAPIPostOperator(
    task_id="post_hello",
    dag=dag,
    text="hello there!",
    channel="#random",
)



해결방법 : Slack API Connection을 만들어주고 BaseHook 사용

self.slack_token = BaseHook.get_connection('slack_api_default').password


slack_operator.py

from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
 
class SlackAlert:
    def __init__(self, channel):
        self.slack_channel = channel
        self.slack_token = BaseHook.get_connection('slack_api_default').password
 
    def slack_fail_alert(self, context):
        alert = SlackAPIPostOperator(
            task_id='slack_failed',
            channel=self.slack_channel,
            text="""
                :red_circle: Task Failed.
                *Task*: {task} 
                *Dag*: {dag}
                *Execution Time*: {exec_date} 
                *Log Url*: {log_url}
                """.format(
                    task=context.get('task_instance').task_id,
                    dag=context.get('task_instance').dag_id,
                    exec_date=context.get('execution_date'),
                    log_url=context.get('task_instance').log_url
                    )
                  )
        return alert.execute(context=context)


test_alert.py

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
 
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
 
from slack_operator import SlackAlert
 
alert = SlackAlert('#일반-test-모니터링')
 
# DAG 기본 설정
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 10, 20),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'on_success_callback': alert.slack_fail_alert
}
 
# DAG 정의
dag = DAG(
    'Slack_test',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
)
 
# 첫 번째 Bash 작업
task1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag,
)
 
# 세 번째 Bash 작업
task2 = BashOperator(
    task_id='print_hello',
    bash_command='echo "Hello World"',
    dag=dag,
)
 
# 작업 순서 정의
task1 >> task2