Celery 는 파이썬 비동기 실행을 관리하는 모듈 입니다. 기본적으로 3가지 Process 가 실행 되어야 합니다. 1. Backend 실행 (Django with Redis), 2. Celery Worker (Celery 상태확인 스크립트), 3. Celery Beat (Celery Beat 반복실행 관리 스크립트) 이중 3번째는 Django Shell Script 를 사용하여 사용자가 직접 명령하는 방식으로 Test 를 진행 할 수 있습니다.

Learn Web Development with Python by Gaston C. Hillar


Contents

Redis

testing link

Celery 는 파이썬 실행함수 비동기 관리모듈 입니다. 앞의 그림과 같이 Message Broker (Redis, RabbitMQ) 를 필요로 하는데 예시에서는 Redis 를 사용 하겠습니다. Redis 의 개념 및 활용에 대한 내용은 Redis, Celery 조합으로 비동기 작업 을 확인하면 도움이 됩니다.

$ sudo apt-get install redis-server
$ pip install Django "celery[redis]"

Django Setting

Django 서비스와 함께 Celery 를 실행하는 예제 내용은 Celery 공식 문서 를 참고 하였습니다. Celery 설정 내용은 Django 의 settings.py 대신 celery.py 에서 아래와 같이 관리하는 방법이 독립적 관리에도 용이하고, 오작동을 피할 수 있었습니다.

# mysite/celery.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'server.settings')
app = Celery(
    'celery_test',
    broker='redis://localhost:6379',
    backend='redis://localhost:6379',
)
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
    task_serializer='json',
    accept_content=['application/json'],
    result_serializer='json',
    timezone=settings.TIME_ZONE,
    enable_utc=False,
)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

Celery를 운영하는데 위처럼 많은 설정이 필요 합니다. 메세지로 Celery 를 관리하는 만큼 Broker 연결은 필수이고, result backend 는 선택 사항 입니다.

Testing

tasks.py 에 작성한 내용의 테스트를 진행하기 위해서 1) Django Server, 2) Celery worker, 3) Django Shell Script 3가지를 함께 띄워야 합니다. 경우에 따라 object has no attribute '_get_task_meta_for' 오류 를 출력하는 경우가 있습니다. 이는 Celery Backend 가 실행되지 않아서 발생하는 경우로써, Django Server 를 실행하지 않고 있거나, CeleryBackend 설정이 제대로 작동하는 지를 확인하면 해결 됩니다.

# notification/tasks.py
import time
from celery import shared_task

@shared_task
def sum(x, y):
    time.sleep(3)
    return x + y

@shared_task 데코레이터는 Celery 에서 관리할 함수들을 등록하게 되고, Celery Worker 에서 목록이 노출되고 있는지를 확인합니다. 앞에서 언급한 3개의 실행 내용을 확인하면 다음과 같이 테스트를 진행 합니다.

Celery 에 등록된 Task 등록 함수를 호출하는 방법으로는 delay() 메소드가 있습니다. apply_async() 메소드의 축소버젼 입니다. 그리고 status 메서드로 Task 등록 함수의 동작 상태를 확인할 수 있습니다.

In [1]: from app.tasks import sum
In [2]: sum(10,2)
Out[2]: 12

In [3]: t = sum.delay(2,4)
In [4]: t.status
Out[4]: 'PENDING'

In [5]: t.status
Out[5]: 'SUCCESS'

Celery Beat

앞에서 살펴본 Test 환경에서 3) Django Shell Script 를 통해서 사용자가 직접 명령을 내리는 경우도 있지만 Celery Beat 명령을 사용하면 보다 편리하게 동작을 확인 할 수 있습니다. 이 내용은 공식문서의 periodic-tasks 를 참고 하였습니다.

#mysite/celery.py
app.conf.beat_schedule = {
    'add-every-3-seconds': {
        'task': 'app.tasks.sum',
        'schedule': 3.0,
        'args': (16, 16),
        'args': {x:16, y:16},
    },
}

내용을 추가한 뒤, Django Shell 스크립트 대신 $ celery -A server beat -l info 를 실행 합니다. 진행 내용은 Celery Worker 터미널에서 확인 할 수 있습니다. args 의 속성값은 tuple 을 적용하거나, 파라미터에 직접 변수들을 적용하는 방법으로 dict() 객체를 사용할 수 있습니다.

$ celery -A server beat -l info
celery beat v5.2.3 (dawn-chorus) is starting.
Configuration ->
    . broker -> redis://localhost:6379//
[2000-01-01 12:00:00,000: INFO/MainProcess] beat: Starting...

CronTab

Celery 실행을 특정한 날짜 조건에 맞춰서 실행하도록 명령하는 내용 입니다. 상세한 설정 예시들은 아래의 표에 정리해 놓았습니다.

app.conf.beat_schedule = {
    'add-every-3-seconds': {
        'task': 'app.tasks.sum',
        # 매주 월요일 7시 30분에 실행
        'schedule': crontab(hour=12, minute=45, day_of_week=1)
        'args': (16, 16)
    },
}
예제 설명 소스코드 crontab()
1분 간격 ()
15분 간격 (minute=’*/15’)
매일 자정 (minute=0, hour=0)
3배수 실행 (minute=0, hour=’*/3’)
3배수 실행 (minute=0,hour=’0,3,6,9,12,15,18,21’)
일요일 1분 간격 (day_of_week=’sunday’)
일요일 1분 간격 (minute=’‘,hour=’‘,day_of_week=’sun’)
목,금 3-4am,5-6pm,10-11pm 10분간격 (minute=’*/10’, hour=’3,17,22’,day_of_week=’thu,fri’)
짝수시간 & 3배수 실행 (minute=0, hour=’/2,/3’)
5배수 실행 (minute=0, hour=’*/5’)
오전8시~오후5시 3배수 (minute=0, hour=’*/3,8-17’)
매월 두번째 날 (0, 0, day_of_month=’2’)
짝수 날에 실행 (0, 0, day_of_month=’2-30/3’)
매달 첫째주, 셋째주 (0, 0, day_of_month=’1-7,15-21’)
매년 5월11일 (0, 0, day_of_month=’11’,month_of_year=’5’)
4분기의 첫달 실행 (0, 0, month_of_year=’*/3’)

Celery Monitoring 1

터미널에서 직접 Celery 의 기본 Event 모니터링 활성화 명령은 다음과 같습니다.

$ celery -A server control enable_events
->  celery@ubuntu: OK
        task events enabled
$ celery -A server events

Celery Monitoring 2

flower 모듈 을 사용한 모니터링 실행방법은 다음과 같습니다. 스크립트를 실행한 뒤 http://localhost:5555 에 접속하면 모니터링 화면을 볼 수 있습니다.

$ celery -A server --broker=redis://localhost:6379// flower
[I 220321 12:00:00 command:100] Visit me at http://localhost:5555
[I 220321 12:00:02 command:101] Broker: redis://localhost:6379//
[I 220321 12:00:03 command:102] Registered tasks: 
    ['app.tasks.sum', 'server.celery.debug_task']


Appendix

(23/1/2) kombu.exceptions.EncodeError: Object not JSON serializable in celery

[2022-04-18 10:49:51,965: INFO/MainProcess] Connected to redis://redis:6379/0
File "../kombu/serialization.py", line 39, in _reraise_errors yield
During handling of the above exception, another exception occurred:

TypeError: Object of type set is not JSON serializable
Traceback (most recent call last):

CeleryMessanger(redis, rabbitMQ) 를 통해서 Django 와 연결 합니다. 이때 데이터 타입은 Json serializer 만 가능해서 Celery 로 예약된 함수의 return 값은 Json 으로 전달을 할 수 있는 string, dict, list 중 하나의 형태로 변환을 하면 해당 오류는 해결됩니다.


참고문서