django-import-export 를 활용해서 data 를 적재하고자 했다. 여기서 문제가 발생했는데 import 데이터가 5,000개를 넘어가니 꽤 오랜시간이 걸렸다. 약 8분 정도였다. 따라서 이 작업을 줄이고자 노력했다. use_bulk 나 batch_size, chunk_size 등 다양한 옵션을 활용했지만 도무지 시간이 줄어들지 않았다. 따라서 어떤 부분이 시간이 오래걸리나 살펴보다가 import_data 메소드에서 비동기를 활용해 데이터를 처리하면 어떨까? 라는 생각이 들어서 적용했다. 어느정도 성과가 있어서 비동기를 적용한 후에 3분 안팎으로 걸리게 수정했다.
사용한 비동기 함수는 2 가지다. aync iterator와 import_data 를 async 하게 바꿨다.
0. django-import-export 설정
class FooResources(resources.ModelResource):
class Meta:
...
skip_unchanged = True
report_skipped = False
use_bulk = True
async_size = 10 # 비동기 실행 횟수 내가 커스텀한 설정
batch_size = 1000
chunk_size = 200
store_row_values = True
use_transactions = False
...
자세한 설정은 공식 문서를 살펴보자.
1. async for
async def enumerate_async(iterable, start_idx=0):
for i, item in enumerate(iterable, start_idx):
yield i, item
2. async import_data
import asyncio
async def async_import_data(
self,
dataset,
dry_run=False,
raise_errors=False,
use_transactions=None,
collect_failed_rows=False,
rollback_on_validation_errors=False,
**kwargs,
):
"""
리스트에 항목들을 비동기적으로 추가하는 함수
"""
items = []
item = []
tasks = [] # asyncio.Task 객체를 담을 리스트
headers = dataset.headers
total = len(dataset)
split_size = (total // self.Meta.async_size) + 1 # 비동기 실행을 위해 데이터를 쪼개는 양
async for idx, data in enumerate_async(dataset, 1):
is_final = idx == total # 마지막 루프일때
is_split_data = idx % split_size == 0 # 설정한 개수만큼 데이터를 쪼갠다.
item.append(data)
if is_split_data or is_final:
items.append(tablib.Dataset(*item, headers=headers))
item = []
for item in items:
task = asyncio.create_task(
sync_to_async(super().import_data)(
item,
dry_run,
raise_errors,
use_transactions,
collect_failed_rows,
rollback_on_validation_errors,
**kwargs,
)
)
tasks.append(task)
return await asyncio.gather(*tasks)
이제 해당 비동기 함수를 실행하고, 결과값을 모아주자.
import asyncio
def import_data(
self,
dataset,
dry_run=False,
raise_errors=False,
use_transactions=None,
collect_failed_rows=False,
rollback_on_validation_errors=False,
**kwargs,
):
results = asyncio.run(
self.async_import_data(
dataset,
dry_run,
raise_errors,
use_transactions,
collect_failed_rows,
rollback_on_validation_errors,
**kwargs,
)
)
result = self.get_result_class()()
result.diff_headers = self.get_diff_headers()
result.total_rows = len(dataset)
# 비동기로 실행한 결과값들을 합치기 위한 로직
for result_ in results:
for key, value in result_.totals.items():
result.totals[key] += value # create, update, delete 한 개수를 알려주기 위해 넣는다.
result.invalid_rows += result_.invalid_rows
result.rows += result_.rows
return result
코드를 보면 고칠게 몇개 보이는데, 해당 아이디어가 잘 안떠오른다. 예를 들면 dataset 을 다시 instance 로 만들어서 합치는게 불필요할 거 같은데, 당장 떠오르는 생각이 없어서 저렇게 했다.
'Django' 카테고리의 다른 글
WebSocket 살펴보기 - 2 (0) | 2023.05.05 |
---|---|
DRF - Null인 경우 마지막에 배치하도록 하기 (0) | 2023.04.17 |
E2E 테스트할 때 외부 API Mock으로 대체하기 (0) | 2023.03.16 |
Django Filterset 활용하기 (0) | 2023.03.13 |
Django staticfile 관리하기 (0) | 2023.03.01 |