이 시리즈는 비동기 작업 큐인 Celery를 활용해보는 시리즈가 되겠다.
1. Celery 란?
Celery 문서를 살펴보면 이렇게 정의한다.
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.
Celery is Open Source and licensed under the BSD License.
Celery는 이러한 시스템을 유지하는 데 필요한 도구를 운영에 제공하면서 방대한 양의 메시지를 처리할 수 있는 단순하고 유연하며 신뢰할 수 있는 분산 시스템입니다. 실시간 처리에 중점을 둔 작업 대기열이며, 작업 스케줄링도 지원합니다. 셀러리는 크고 다양한 사용자 및 기여자 커뮤니티를 가지고 있습니다. IRC 또는 우리의 메일링 리스트에 참여하십시오. Celery는 오픈 소스이며 BSD 라이선스에 따라 라이선스가 부여됩니다.
요약하면 Celery는 분산 시스템이며 비동기 Task Queue를 이용한 실시간 처리를 할 수 있다.
그렇다면 Task Queue란 무엇일까?
1.1 Task Queue
공식 문서를 살펴보면 아래와 같이 정의한다.
태스크 큐는 스레드 또는 시스템 간에 작업을 분배하는 메커니즘으로 사용됩니다.
태스크 큐의 입력은 태스크라고 하는 작업 단위입니다. 전용 작업자 프로세스는 수행할 새 작업을 위해 작업 대기열을 지속적으로 모니터링합니다.
Celery는 일반적으로 브로커(broker)를 사용하여 클라이언트와 작업자(worker) 사이를 중재하는 메시지를 통해 통신합니다. 작업을 시작하기 위해 클라이언트는 대기열에 메시지를 추가하고 브로커는 해당 메시지를 작업자에게 전달합니다.
Celery 시스템은 여러 작업자와 브로커로 구성되어 고가용성과 수평적 확장이 가능합니다.
즉 Task Queue에 적재되어 있는 Task를 브로커를 통해 작업자에게 전달한다.
여기 Worker는 Task를 처리하는 역할을 뜻하고, 브로커는 클라이언트와 Worker를 이어준다.
여기서 메시지와 Task는 비슷한 의미로 쓰인 거 같다.
2. 실습
기본 실습은 공식문서와 블로그를 참고하여 작성하였다. 일단 docker로 RabbitMQ Broker를 구성했다.(권장 사항인 거 같다.)
사실 공식 문서만 참고해도 가능하다. 따라서 나는 실습과정 중에서 안됐던 부분이나 혹시나 헷갈릴만한 부분을 위주로 설명하겠다.
2.1 개발환경에서 Worker 실행
나는 django settings의 디렉토리가 실습환경과는 달랐다.
config
│ │ ├── __init__.py
│ │ ├── settings
│ │ │ ├── base.py
│ │ │ ├── celery.py
│ │ │ ├── development.py
│ │ │ ├── __init__.py
│ │ │ ├── production.py
따라서 config.__init__.py 에 아래와 같이 작성했다.
# config.__init__.py
# 이렇게 설정해주면 Django가 시작할떄 Celery가 항상 import 되고
# shared_task 데코레이션이 Celery를 이용하게됩니다.
from config.settings.celery import app as celery_app
__all__ = ('celery_app',)
또한 config.settings.celery 는 아래와 같이 작성했다
# config.settings.celery.py
import os
from celery import Celery
# Celery 모듈을 위한 Django 기본세팅
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')
app = Celery('config', backend='rpc://', broker='pyamqp://localhost:5672')
# 여기서 문자열을 사용하는것은 작업자가가 자식 프로세스 직렬화 구성을 하지 않는것을 의미합니다.
# -namespace='CELERY' 의 의미는 셀러리와 관련된 모든 설정은 CELERY_ 라는 prefix로 시작함을 의미
app.config_from_object('django.conf:settings', namespace='CELERY')
# Django 에 등록된 모든 task 모듈을 로드합니다.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
이렇게 작성 후 worker를 실행했다.
celery -A config worker -l INFO
실행 시킨 뒤 add.delay(4, 4)를 실행했는데 여기서 또 하나의 문제가 발생했다.
2.2 로그 문제
사실 별 이슈는 아니었다. 참고한 블로그에서 task를 실행시켰을 때 로그에 결괏값이 찍혀야 된다고 쓰여 있었는데, 아무리 실행을 시켜도 결괏값이 찍히지 않았다.
stack overflow에서 해결법을 찾았다. 요약하자면 windows 에서는 celery 4.0 이상을 지원하지 않으므로 gevent 실행 풀을 사용해야 한다는 것이었다.
$ celery -A <app_name> worker -l info -P gevent
Pool 옵션으로 gevent를 입력하여 실행하니 다행히 나왔다.
gevent에 대한 정리는 이후 포스팅에서 진행하겠다.
참고
1. https://docs.celeryq.dev/en/stable/index.html
2. https://kimeuichan.github.io/posts/python-celery/
3. https://lucky516.tistory.com/2
4. https://shifu-oh.tistory.com/10
'Python' 카테고리의 다른 글
WSL2 환경에서 Poetry 사용하기 (0) | 2023.01.16 |
---|---|
[Python] GIL이 뭐죠? (0) | 2022.12.21 |
컴파일이란 (0) | 2022.12.07 |
Python 알아보기(1) - 파이썬의 특징 (2) | 2022.12.06 |
[Python] - 정규표현식을 이용한 문자열 처리 (0) | 2022.11.19 |