• Airflow 란?
    • "워크플로우"를 '프로그램적작성' 및 '스케쥴링' 및 '모니터링' 할수있게 해주는 오픈소스 플랫폼.
    • "DAGs (Directed Acyclic Graphs)" 라는 단위에 "워크플루우"를 처리를 하게 된다.
    • 특징
      • Dynamic : `파이프라인`을 코드기반의 다이나믹한 생성 및 설정을 할수있게 제공한다.
      • Extensible : 손쉽게 Operator 및 Executor 를 정의하고, 추상화된 환경에 딱 맞는 라이브러리 연동이 용의하다.
      • Elegant : `파이프라인`이 간결하고 명시적이다. (강력한 Jinja 템플릿 엔진을 기반에 "스크립트.sh" 적용 지원)
      • Scalalbe : 모듈화된 아키텍쳐로~ MQ를 사용하여, 초대량의 워커도 얼마든 오케스트레이션 할수있다. 
      • Not Data-Streaming Solution : 데이터를 옮기는 처리는 하지 않는다;;; (메타데이터를 교환하는 정도)
    • (2014.10 Airbnb에서 개발 -> 2015.06 오픈소스 공개 -> 2016.03 아파치 인큐베이팅 -> 2019.01 Top-Level 프로젝트 공표...)
  • 설치 및 시작
    • pip install apache-airflow
    • airflow initdb 및 checkdb 및 resetdb
    • airflow webserver -p 8080
    • airflow scheduler
      • 디폴트 홈 : ~/airflow
      • DAGs 위치 : ~/airflow/dags (지속적으로 체크됨)
      • airflow.cfg = 기본 설정값 파일 자동생성. (웹UI 상, Admin->Configuration 확인가능)
      • airflow.db = 디폴드 sqlite 데이터베이스.
      • (기본적 메타저장으로 sqlite 디비를 사용하는데... 동시접근을 지원하지 않아서, 빨리 맛보기용 정도의 수준임)
      • 웹서버 PID 파일 = "~/airflow/webserver.pid"
  • 튜토리얼
    • `파이프라인` 정의는 아래과 같은식 이다.
      • 1) DAG 인스턴스화 및 셋팅. (다양한 args 적용 가능)
      • 2) Task들 을 특정 Operator 으로 인스턴스화 및 셋팅.
      • 3) Task들 간의 디펜던시 셋팅.
    • Jinja 템플릿을 기반으로, 유용한 파라미터 및 매크로 등을 "스크립트.sh" 작성하여~ Task에 적용할수있다.
    • Test
      • python ~/airflow/dags/다그.py // 예외없이 실행되면... 문제없는것
      • airflow list_dags // 활성화된 DAGs 목록
      • airflow list_tasks `dag_id값` --tree // 해당 DAG의 Task들 목록
      • airflow test `dag_id값` ... // 단순 DAG 인스턴스 테스트 (디펜던시:O , DB상태:X)
      • airflow test `dag_id값` `task_id값` ... // 단순 단일 Task 인스턴스 테스트 (디펜던시:X , DB상태:X)
    • Backfill
      • 기본적으로 실행되지 않은 Task들을 과거값 기준으로 재실행. (다양한 커맨드 옵션 제공)
      • backfill 실행은 Task의 디펜던시를 고려하고, 로그를 기록하고, DB상태을 알려주는 등 다 함.
      • 웹서버를 띄워우면, 해당 처리과정을 시각적으로 확인 할수있습니다.
      • (코드상 depends_on_past, wait_for_downstream 등의 파라미터가 영향)
      • 예) airflow backfill test -s 2015-06-01 -e 2015-06-07
    • 강제 Trigger
      • 예) airflow trigger_dag test -e 2020-02-02
  • 아키텍쳐
    •  

      출처 : https://zzsza.github.io/data/2018/01/04/airflow-1/
    • Webserver : UI 제공. (workflow 상태표시 및 실행 및 수동제어 및 로그조회 등등)
    • Scheduler : Task들을 관리하고, DB에 기록하고, Worker에게 잡분배하는 등등...
      • (단순 순차적 실행만 가능한 "SequentialExecutor" 에서 벗어나, CeleryExecutor 등에~ 병렬처리 가능한...)
  • 주요개념
    • DAG :
      • 하나의 처리를 하게되는 workflow로써, Task들로 이루어 짐.
      • (기본 "example DAGs"들이 꼴붸기 싫으면, airflow.cfg의 load_examples=false 하면 된다고 함.ㅋ)
      • default_args로써 작성자 및 start_data 등등을 셋팅하고, dag_id 및 interval 등등을 정해서 생성 함.
      • Schedule 셋팅을 해두고, Scheduler를 실행해두면~ 실행됨.
    • DAG Runs :
      • "execution_data은 실행날짜가 아니라 주문번호다."
      • "굳이 시간으로 이해하고 싶다면 '예약을 잡으려고 시도한 시간' 이라고..."
        • 출처 : https://blog.naver.com/gyrbsdl18/221561318823
      • Task를 실행하려면, 해당 작업의 transction id를 남겨야 하는데~ 그것을 (task_id+exe_dt)로 사용하는듯...
      • 씪뽺 뭔 개같은 개븅뜩같이 뽥쀌~ 뭔솔 먼의미고... ㅎㅎ airbnb 망해라 181818 커커커컼
    • Task :
      • DAG 안에서 Operator를 활용해 인스턴스화 된 것.
      • BashOperator, PythonOperator 등등 각종 제공.
      • Task 간의 디펜던시는 t1.set_downstream(t2) 혹은 t1 >> t2 식으로 연결하면 된다.
    • Additional Functionality
      • XCOM : Task 간에 데이터를 공유할수있는 저장공간이 제공됨. (xcom_push(), xcom_pull(), ...)
      • Variables : json 파일 등으로, 환경변수를 셋팅 할수있도록 제공함.
      • Hooks : 외부 플랫폼 or 데이터베이스에 접근을 할수있도록 만들어둔 인터페이스 ...
      • ...
    • ...
  • 외부 Connection 연동
    • Airflow 와 MySQL, GoogleCloud, Slack 등등을 연동 할 수 있음...
  • ...
  • T!ps
    • 1) 도커 기반으로, 초간편 구축법
      • https://github.com/puckel/docker-airflow 참고.
      • # docker volume create --name=shared-airflow-vol
        # make sh -> chmod 777 /usr/local/airflow
        # airflow 공용(?)볼륨을 만들어서, 독립된 도커에 적용하니~ 잘되었다.
        volumes:
        	redis-data:
        		driver: local
        	postgres-data:
            		driver: local
        	airflow-vol:
        		external:
        			name: shared-airflow-vol
        
        services:
            postgres: # 비휘발적으로 /var/lib/postgresql/data 볼륨을 지정.
            redis: ...
            flower: ...
            
            webserver:
            scheduler:
            	environment:
              	 - LOAD_EX=n
              	 - FERNET_KEY=암호키
              	 - EXECUTOR=Celery
                volumes:
              	 - airflow-vol:/usr/local/airflow
              	 - ./airflow/dags:/usr/local/airflow/dags
                 # 기존의 airflow.cfg를 덮어써도 잘 되었다.
              	 - ./airflow/airflow.cfg:/usr/local/airflow/airflow.cfg
                 # requirements.txt 를 작성하여, 루트에 두니 잘 지원 되었다.
              	 - ./airflow/requirements.txt:/requirements.txt
              	 # https://github.com/teamclairvoyant/airflow-rest-api-plugin 활용함.
              	 - ./airflow/plugins:/usr/local/airflow/plugins
        	worker:
            	volumes:
                 - "
        	 - /var/run/docker.sock:/var/run/docker.sock
                command: worker --concurrency 1 --autoscale 1,1 -q 큐네임
    • 2) airflow.cfg
      • [core]
        • dags_folder = /usr/local/airflow/dags
        • base_log_folder = /usr/local/airflow/logs
        • parallelism = 32 #전반적인? DAGs의 task들을 처리하는 executor수?
        • dag_concurrency : 16 #scheduler가 DAG당 큐잉하는 동시성?
        • max_active_runs_per_dag : ? #각 DAG의 최대 실행 제한? (넘어도 되긴됬음)
      • [webserver]
        • authenticate = True
        • auth_backend = airflow.contrib.auth.backends.password_auth
        • # 어드민 페이지 "아이디//비번" 셋팅
        • # User는 그냥 DAG에 예제코드 짜서... 한번 실행 시킴.
      • [scheduler]
        • max_threads : 2 #병력적으로 dags 스케쥴링하는 scheduler 쓰레드수?
      • [celery]
        • worker_concurrency = 16
        • worker_autoscale = 16,12
    • 3) puckel/docker-airflow 이미지 커스텀 리빌드
      • 이미지 내부에 도커 및 도컴 을 사용하기 위해, FROM puckel/docker-airflow:1.10.9 기반으로~ DockerFile 작성!
      • 결과적으로 호스트의 /var/run/docker.sock 을 마운트하여... worker 에서 '컨테이트 프로세스'를 실행할수있었다.
      • (sudo 를 설치하고, 수정한 entrypoint.sh 를 덮어씌우고, 지정함)
    • 4) entrypoint.sh 커스텀
      • worker 경우시, sudo chmod 777 /var/run/docker.sock 을 하도록 하였다.
    • 5) DAG 관련
      • DAG.default_args
        • 'start_date': days_ago(0)
        • 'queue': 큐네임
        • 'depends_on_past' 및 'wait_for_downstream': ...
        • 'trigger_rule': ...
        • 'pool' 및 'pool_slots': 생성한 풀 이름. (생성때 슬롯수 지정)
        • 'priority_weight' 및 'weight_rule': ...
        • 'task_concurrency': ...
        • 'retries' 및 'retry_delay' : ...
        • on_success_callback 혹은 on_failure_callback 등등 : ...
      • Operator
        • BashOperator
        • PythonOperator
        • TriggerDagRunOperator
        • DockerOperator
        • ...
    • 6) 웹서버 airflow-rest-api-plugin 활용
      • ...
    • 7) task 분할과 순차적 처리 삽질...
      • 기본적으로... (trigger) -> Webserver -> pool -> Scheduler -> (slot오픈?) -> queue -> Worker 식으로 DAG의 task들이 처리 됨.
      • 한큐에 할 일을 2개(task1 >> task2) 으로 분리하고 여러번 트리거 하니... task1 만 여러개 큐잉 되었다.
      • (task1에서 병목이 되어, 모든 task1이 다 끝나야지~ task2가 그제서야 시작)
      • trigger_rule = dummy 로 하면, 해당 dag의 모든 task들를 -> 일단 pool에 다 붓는다.
      • 굳이 억지로 slots 을 1 으로 pool 을 만들고 -> priority_weight = 동일 & weight_rule = absolute 하면,
      • pool 에 쏫아부은 순차적으로 queue에 들어가는것 같아 보였다.
      • 음... 많이 테스트 하다 보니 아닌것 같다. (최초 pool에 부어지는 DAG 단위의 task 들에 순서쪽에 이슈;;)
    • 8) airflow variables 활용
      • 전역변수로써, 어드민 페이지에서 Variables 를 만들고! {{ var.value.이름 }} 식으로 사용할수있다.
      • 예) DockerOperator의 volumes에 해당값을 다이나믹하게 적용하려고 하니... 안되었다.
      • 삽질을 하다가... task1.template_fields = ('command', 'environment', 'container_name', 'volumes') 식으로~
      • Jinja 템플릿을 적용할 필드를 (디폴트값이 아닐경우) 따로 지정해야 되었다.

Celery 란?

   (다음화에 계속 ...)

-끝-

'MQ' 카테고리의 다른 글

SQS  (0) 2020.05.16
RabbitMQ (AMQP)  (0) 2020.04.30
Elastic Stack 란?  (1) 2020.04.12
flafka (flume + kafka)  (0) 2019.06.08
빅데이터와 하둡  (0) 2019.05.18

+ Recent posts