이전 버전에서는 SlackAPIPostOperator 메소드 내에서 token 를 입력을 했어야 했지만 직접 token 을 입력해보았을 때, 에러가 발생하고 현재는 지원하지 않는 것 같다.
Post messages to a Slack channel.
slack = SlackAPIPostOperator(
task_id="post_hello",
dag=dag,
text="hello there!",
channel="#random",
)
해결방법 : Slack API Connection을 만들어주고 BaseHook 사용
self.slack_token = BaseHook.get_connection('slack_api_default').password
slack_operator.py
from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
class SlackAlert:
def __init__(self, channel):
self.slack_channel = channel
self.slack_token = BaseHook.get_connection('slack_api_default').password
def slack_fail_alert(self, context):
alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=self.slack_channel,
text="""
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url
)
)
return alert.execute(context=context)
test_alert.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
from slack_operator import SlackAlert
alert = SlackAlert('#일반-test-모니터링')
# DAG 기본 설정
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 10, 20),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_success_callback': alert.slack_fail_alert
}
# DAG 정의
dag = DAG(
'Slack_test',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
# 첫 번째 Bash 작업
task1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
# 세 번째 Bash 작업
task2 = BashOperator(
task_id='print_hello',
bash_command='echo "Hello World"',
dag=dag,
)
# 작업 순서 정의
task1 >> task2
'DevOps' 카테고리의 다른 글
[Airflow] DAG 생성 (0) | 2024.10.25 |
---|---|
[Airflow] Connection 연결 시 [TEST] 버튼 비활성화 해결방법 (0) | 2024.10.22 |
[Airflow] Airflow 설치 (Docker) (0) | 2024.10.22 |
Dockerfile을 이용한 코드에 의한 서버 구축 (0) | 2024.10.21 |
[Vector] Logstash 대체안 Vector (0) | 2024.04.19 |