- Airflow 란?
- "워크플로우"를 '프로그램적작성' 및 '스케쥴링' 및 '모니터링' 할수있게 해주는 오픈소스 플랫폼.
- "DAGs (Directed Acyclic Graphs)" 라는 단위에 "워크플루우"를 처리를 하게 된다.
- 특징
- Dynamic : `파이프라인`을 코드기반의 다이나믹한 생성 및 설정을 할수있게 제공한다.
- Extensible : 손쉽게 Operator 및 Executor 를 정의하고, 추상화된 환경에 딱 맞는 라이브러리 연동이 용의하다.
- Elegant : `파이프라인`이 간결하고 명시적이다. (강력한 Jinja 템플릿 엔진을 기반에 "스크립트.sh" 적용 지원)
- Scalalbe : 모듈화된 아키텍쳐로~ MQ를 사용하여, 초대량의 워커도 얼마든 오케스트레이션 할수있다.
- Not Data-Streaming Solution : 데이터를 옮기는 처리는 하지 않는다;;; (메타데이터를 교환하는 정도)
- (2014.10 Airbnb에서 개발 -> 2015.06 오픈소스 공개 -> 2016.03 아파치 인큐베이팅 -> 2019.01 Top-Level 프로젝트 공표...)
- 설치 및 시작
- pip install apache-airflow
- airflow initdb 및 checkdb 및 resetdb
- airflow webserver -p 8080
- airflow scheduler
- 디폴트 홈 : ~/airflow
- DAGs 위치 : ~/airflow/dags (지속적으로 체크됨)
- airflow.cfg = 기본 설정값 파일 자동생성. (웹UI 상, Admin->Configuration 확인가능)
- airflow.db = 디폴드 sqlite 데이터베이스.
- (기본적 메타저장으로 sqlite 디비를 사용하는데... 동시접근을 지원하지 않아서, 빨리 맛보기용 정도의 수준임)
- 웹서버 PID 파일 = "~/airflow/webserver.pid"
- 튜토리얼
- `파이프라인` 정의는 아래과 같은식 이다.
- 1) DAG 인스턴스화 및 셋팅. (다양한 args 적용 가능)
- 2) Task들 을 특정 Operator 으로 인스턴스화 및 셋팅.
- 3) Task들 간의 디펜던시 셋팅.
- Jinja 템플릿을 기반으로, 유용한 파라미터 및 매크로 등을 "스크립트.sh" 작성하여~ Task에 적용할수있다.
- Test
- python ~/airflow/dags/다그.py // 예외없이 실행되면... 문제없는것
- airflow list_dags // 활성화된 DAGs 목록
- airflow list_tasks `dag_id값` --tree // 해당 DAG의 Task들 목록
- airflow test `dag_id값` ... // 단순 DAG 인스턴스 테스트 (디펜던시:O , DB상태:X)
- airflow test `dag_id값` `task_id값` ... // 단순 단일 Task 인스턴스 테스트 (디펜던시:X , DB상태:X)
- Backfill
- 기본적으로 실행되지 않은 Task들을 과거값 기준으로 재실행. (다양한 커맨드 옵션 제공)
- backfill 실행은 Task의 디펜던시를 고려하고, 로그를 기록하고, DB상태을 알려주는 등 다 함.
- 웹서버를 띄워우면, 해당 처리과정을 시각적으로 확인 할수있습니다.
- (코드상 depends_on_past, wait_for_downstream 등의 파라미터가 영향)
- 예) airflow backfill test -s 2015-06-01 -e 2015-06-07
- 강제 Trigger
- 예) airflow trigger_dag test -e 2020-02-02
- `파이프라인` 정의는 아래과 같은식 이다.
- 아키텍쳐
-
- Webserver : UI 제공. (workflow 상태표시 및 실행 및 수동제어 및 로그조회 등등)
- Scheduler : Task들을 관리하고, DB에 기록하고, Worker에게 잡분배하는 등등...
- (단순 순차적 실행만 가능한 "SequentialExecutor" 에서 벗어나, CeleryExecutor 등에~ 병렬처리 가능한...)
-
- 주요개념
- DAG :
- 하나의 처리를 하게되는 workflow로써, Task들로 이루어 짐.
- (기본 "example DAGs"들이 꼴붸기 싫으면, airflow.cfg의 load_examples=false 하면 된다고 함.ㅋ)
- default_args로써 작성자 및 start_data 등등을 셋팅하고, dag_id 및 interval 등등을 정해서 생성 함.
- Schedule 셋팅을 해두고, Scheduler를 실행해두면~ 실행됨.
- DAG Runs :
- "execution_data은 실행날짜가 아니라 주문번호다."
- "굳이 시간으로 이해하고 싶다면 '예약을 잡으려고 시도한 시간' 이라고..."
- 출처 : https://blog.naver.com/gyrbsdl18/221561318823
- Task를 실행하려면, 해당 작업의 transction id를 남겨야 하는데~ 그것을 (task_id+exe_dt)로 사용하는듯...
씪뽺 뭔 개같은 개븅뜩같이 뽥쀌~ 뭔솔 먼의미고... ㅎㅎ airbnb 망해라 181818 커커커컼
- Task :
- DAG 안에서 Operator를 활용해 인스턴스화 된 것.
- BashOperator, PythonOperator 등등 각종 제공.
- Task 간의 디펜던시는 t1.set_downstream(t2) 혹은 t1 >> t2 식으로 연결하면 된다.
- Additional Functionality
- XCOM : Task 간에 데이터를 공유할수있는 저장공간이 제공됨. (xcom_push(), xcom_pull(), ...)
- Variables : json 파일 등으로, 환경변수를 셋팅 할수있도록 제공함.
- Hooks : 외부 플랫폼 or 데이터베이스에 접근을 할수있도록 만들어둔 인터페이스 ...
- ...
- ...
- DAG :
- 외부 Connection 연동
- Airflow 와 MySQL, GoogleCloud, Slack 등등을 연동 할 수 있음...
- ...
- T!ps
- 1) 도커 기반으로, 초간편 구축법
- https://github.com/puckel/docker-airflow 참고.
-
# docker volume create --name=shared-airflow-vol # make sh -> chmod 777 /usr/local/airflow # airflow 공용(?)볼륨을 만들어서, 독립된 도커에 적용하니~ 잘되었다. volumes: redis-data: driver: local postgres-data: driver: local airflow-vol: external: name: shared-airflow-vol services: postgres: # 비휘발적으로 /var/lib/postgresql/data 볼륨을 지정. redis: ... flower: ... webserver: scheduler: environment: - LOAD_EX=n - FERNET_KEY=암호키 - EXECUTOR=Celery volumes: - airflow-vol:/usr/local/airflow - ./airflow/dags:/usr/local/airflow/dags # 기존의 airflow.cfg를 덮어써도 잘 되었다. - ./airflow/airflow.cfg:/usr/local/airflow/airflow.cfg # requirements.txt 를 작성하여, 루트에 두니 잘 지원 되었다. - ./airflow/requirements.txt:/requirements.txt # https://github.com/teamclairvoyant/airflow-rest-api-plugin 활용함. - ./airflow/plugins:/usr/local/airflow/plugins worker: volumes: - " - /var/run/docker.sock:/var/run/docker.sock command: worker --concurrency 1 --autoscale 1,1 -q 큐네임
- 2) airflow.cfg
- [core]
- dags_folder = /usr/local/airflow/dags
- base_log_folder = /usr/local/airflow/logs
- parallelism = 32 #전반적인? DAGs의 task들을 처리하는 executor수?
- dag_concurrency : 16 #scheduler가 DAG당 큐잉하는 동시성?
- max_active_runs_per_dag : ? #각 DAG의 최대 실행 제한? (넘어도 되긴됬음)
- [webserver]
- authenticate = True
- auth_backend = airflow.contrib.auth.backends.password_auth
- # 어드민 페이지 "아이디//비번" 셋팅
- # User는 그냥 DAG에 예제코드 짜서... 한번 실행 시킴.
- [scheduler]
- max_threads : 2 #병력적으로 dags 스케쥴링하는 scheduler 쓰레드수?
- [celery]
- worker_concurrency = 16
- worker_autoscale = 16,12
- [core]
- 3) puckel/docker-airflow 이미지 커스텀 리빌드
- 이미지 내부에 도커 및 도컴 을 사용하기 위해, FROM puckel/docker-airflow:1.10.9 기반으로~ DockerFile 작성!
- 결과적으로 호스트의 /var/run/docker.sock 을 마운트하여... worker 에서 '컨테이트 프로세스'를 실행할수있었다.
- (sudo 를 설치하고, 수정한 entrypoint.sh 를 덮어씌우고, 지정함)
- 4) entrypoint.sh 커스텀
- worker 경우시, sudo chmod 777 /var/run/docker.sock 을 하도록 하였다.
- 5) DAG 관련
- DAG.default_args
- 'start_date': days_ago(0)
- 'queue': 큐네임
- 'depends_on_past' 및 'wait_for_downstream': ...
- 'trigger_rule': ...
- 'pool' 및 'pool_slots': 생성한 풀 이름. (생성때 슬롯수 지정)
- 'priority_weight' 및 'weight_rule': ...
- 'task_concurrency': ...
- 'retries' 및 'retry_delay' : ...
- on_success_callback 혹은 on_failure_callback 등등 : ...
- Operator
- BashOperator
- PythonOperator
- TriggerDagRunOperator
- DockerOperator
- ...
- DAG.default_args
- 6) 웹서버 airflow-rest-api-plugin 활용
- ...
- 7) task 분할과 순차적 처리 삽질...
- 기본적으로... (trigger) -> Webserver -> pool -> Scheduler -> (slot오픈?) -> queue -> Worker 식으로 DAG의 task들이 처리 됨.
- 한큐에 할 일을 2개(task1 >> task2) 으로 분리하고 여러번 트리거 하니... task1 만 여러개 큐잉 되었다.
- (task1에서 병목이 되어, 모든 task1이 다 끝나야지~ task2가 그제서야 시작)
- trigger_rule = dummy 로 하면, 해당 dag의 모든 task들를 -> 일단 pool에 다 붓는다.
굳이 억지로 slots 을 1 으로 pool 을 만들고 -> priority_weight = 동일 & weight_rule = absolute 하면,pool 에 쏫아부은 순차적으로 queue에 들어가는것 같아 보였다.- 음... 많이 테스트 하다 보니 아닌것 같다. (최초 pool에 부어지는 DAG 단위의 task 들에 순서쪽에 이슈;;)
- 8) airflow variables 활용
- 전역변수로써, 어드민 페이지에서 Variables 를 만들고! {{ var.value.이름 }} 식으로 사용할수있다.
- 예) DockerOperator의 volumes에 해당값을 다이나믹하게 적용하려고 하니... 안되었다.
- 삽질을 하다가... task1.template_fields = ('command', 'environment', 'container_name', 'volumes') 식으로~
- Jinja 템플릿을 적용할 필드를 (디폴트값이 아닐경우) 따로 지정해야 되었다.
- 1) 도커 기반으로, 초간편 구축법
Celery 란?
(다음화에 계속 ...)
-끝-
'MQ' 카테고리의 다른 글
SQS (0) | 2020.05.16 |
---|---|
RabbitMQ (AMQP) (0) | 2020.04.30 |
Elastic Stack 란? (1) | 2020.04.12 |
flafka (flume + kafka) (0) | 2019.06.08 |
빅데이터와 하둡 (0) | 2019.05.18 |