실무에서 Kubernetes Executor 만큼 많이 사용되는 Celery Executor의 아키텍처에 대해 정리해보자.
Celery Executor
Celery Executor는 Airflow에서 작업(Task)를 여러 Worker에게 분산하여 처리하는 방식이다.
Celery라는 분산 작업 큐 시스템을 사용하여 작업을 관리하고 분배한다.
Component
- Web Server
: Web UI를 제공하며, 사용자가 DAG과 Task의 상태를 모니터링하고 관리할 수 있게 한다. Task를 수동으로 트리거할 수도 있다. - Workers
:실제로 할당된 Task를 실행한다. Celery Executor는 여러 Worker에게 Task를 분산하여 할당한다. - Database
: 메타데이터 데이터베이스로, DAG, Task, 스케줄링 정보, 실행 상태, 로그 등을 저장한다. Scheduler와 Webserver는 이 데이터베이스를 통해 Airflow의 상태를 추적하고 관리한다. - Scheduler
: DAG에 정의된 작업을 스케줄링하고, 어떤 Task가 언제 실행돼야 하는지를 결정한다. DAG 파일을 읽어, Task를 Queue Broker에게 보낼 준비를 한다. - DAG Directory(Files)
: DAG 파일들이 저장된 디렉토리. Task와 Workflow의 정의가 포함된 파일들을 스케줄러가 읽어 Task를 스케줄링한다. - Celery
: 스케줄링하는 Queue 사용 방식
Celery Components
- Queue Broker
: Celery 시스템의 핵심으로 실행해야할 Command를 저장, 스케줄러가 작업(Task)를 큐에 넣으면, Queue Broker가 이를 관리하고, 적절한 Worker에게 작업을 할당한다. 주로 Redis나 RabbitMQ와 같은 메시지 큐 시스템을 사용한다. - Result Backend
: Worker가 실행한 Task(완료된 Command)의 결과를 저장한다. 스케줄러는 이 결과를 통해 다음 작업을 결정한다. - Flower
: Celery Queue 및 작업의 상태를 모니터링하는 Web UI
Work Flow
- Web server -> Workers
: Web server는 작업 실행 로그를 가져오기 위해 Workers와 통신한다. (이를 통해, 사용자는 웹 UI에서 작업의 실행 결과를 확인할 수 있다.) - Web server -> DAG Files
: Web server는 DAG 파일 구조를 확인해 사용자에게 DAG의 전체 구조를 보여준다. 이를 통해 DAG의 워크플로우가 어떻게 구성되어 있는지 시각적으로 확인할 수 있다. - Web server -> Database
: Web server는 (메타데이터)데이터베이스에서 작업의 상태를 가져온다. 이를 통해 사용자는 각 작업의 현재 어떤 상태(ex. 대기중, 실행중, 완료됨)에 있는지 확인할 수 있다. - Workers -> DAG files
: Worker는 DAG 파일을 확인해 어떤 작업을 수행해야 하는지 알아내고, 그 작업을 실행한다. - Workers -> Database
: Worker는 데이터베이스와 통신해 연결 설정(Connection), 변수(variables), XCOM(작업 간 데이터 교환)과 관련된 정보를 가져오고 저장한다. - Workers -> Celery's result backend
: Worker는 Celery의 result backend에 작업의 상태(ex. success, fail 등)를 저장한다. 이를 통해 작업의 최종 결과가 기록된다. - Workers -> Celery's broker : Subscribe
: Worker는 Celery의 broker에 실행될 명령(Command)을 저장한다. - Scheduler -> DAG Files
: Scheduler는 DAG 파일을 (1분 마다) 확인해 어떤 작업이 언제 실행돼야 하는지 결정하고, 해당 작업을 실행하도록 한다. - Scheduler -> Database
: Scheduler는 (메타데이터)데이터베이스에 DAG 실행 및 관련 작업의 정보를 저장한다. 어떤 DAG이 언제 실행됐고, 어떤 작업이 수행됐는지 기록된다. - Scheduler -> Celery's result backend
: Scheduler는 Celery의 result backend에서 완료된 작업의 상태를 가져와 후속 작업을 스케줄링하거나 관리한다. - Scheduler -> Celery's broker : Publish
: Scheduler는 Celery의 broker에 실행항 명령(Command)를 넣고, 이 명령들은 작업을 수행하기 위해 Worker로 전달된다.
Task Execution Process
프로세스
- ScheduleProcess: 작업(Task)를 처리하고, CeleryExecutor를 사용해 작업을 실행한다.
- WorkerProcess: 대기열(Queue)를 관찰하면서, 새로운 작업이 나타나는지 기다린다.
-> 초기, 이 두 개의 Process가 실행중이다.
데이터베이스
- (Celery의)QueueBroker: 작업 대기열(Task Queue)을 관리하는 데이터베이스
- (Celery의)ResultBackend: 작업의 결과와 상태를 저장하는 데이터베이스
작업 흐름
- SchedulerProcess는 DAG Files를 확인해, 실행할 작업(Task)를 발견하면 QueueBroker에 해당 작업을 보낸다.
- SchedulerProcess는 작업이 완료됐는지 확인하기 위해 ResultBackend에 주기적으로 작업 상태를 조회한다.
- QueueBroker는 작업이 Queue(대기열)에 들어오면, 이를 WorkerProcess에게 전달한다.
- WorkerProcess는 전달받은 작업(Task)을 하나의 WorkerChildProcess에 할당한다.
- WorkerChildProcess는 할당된 작업을 처리하기 위해 LocalTaskJobProcess라는 새로운 프로세스를 생성한다.
- LocalTaskJobProcess의 로직은 LocalTaskJob 클래스에 의해 정의되며, 이 프로세스는 TaskRunner를 사용해 새로운 프로세스(RawTaskProcess)를 시작한다.
- RawTaskProcess가 실제로 사용자의 코드를 실행하는 역할을 한다.
- 작업을 완료하면 LocalTaskProcess를 종료한다.
- 작업을 완료하면 WorekrChildProcess를 종료한다.
- WorkerChildProcess는 작업이 끝났다는 것을 WorkerProcess에 알리고, 다음 작업을 처리할 준비가 됐음을 알린다.
- WorkerProcess는 작업 상태 정보를 ResultBackend에 저장한다.
- Celery 상태 정보를 SchedulerProcess에 보낸다.
- SchedulerProcess가 ResultBackend에 작업 상태를 다시 조회할 때, 그 작업의 상태 정보를 얻는다.
'Data Engineering > Airflow' 카테고리의 다른 글
Airflow 살펴보기 (0) | 2023.09.11 |
---|