django-import-export 를 활용해서 data 를 적재하고자 했다. 여기서 문제가 발생했는데 import 데이터가 5,000개를 넘어가니 꽤 오랜시간이 걸렸다. 약 8분 정도였다. 따라서 이 작업을 줄이고자 노력했다. use_bulk 나 batch_size, chunk_size 등 다양한 옵션을 활용했지만 도무지 시간이 줄어들지 않았다. 따라서 어떤 부분이 시간이 오래걸리나 살펴보다가 import_data 메소드에서 비동기를 활용해 데이터를 처리하면 어떨까? 라는 생각이 들어서 적용했다. 어느정도 성과가 있어서 비동기를 적용한 후에 3분 안팎으로 걸리게 수정했다.
사용한 비동기 함수는 2 가지다. aync iterator와 import_data 를 async 하게 바꿨다.
0. django-import-export 설정
class FooResources(resources.ModelResource):
    class Meta:
        ...
        skip_unchanged = True
        report_skipped = False
        use_bulk = True
        async_size = 10  # 비동기 실행 횟수 내가 커스텀한 설정
        batch_size = 1000
        chunk_size = 200
        store_row_values = True
        use_transactions = False
        ...
자세한 설정은 공식 문서를 살펴보자.
1. async for
async def enumerate_async(iterable, start_idx=0):
    for i, item in enumerate(iterable, start_idx):
        yield i, item
2. async import_data
 import asyncio
 async def async_import_data(
        self,
        dataset,
        dry_run=False,
        raise_errors=False,
        use_transactions=None,
        collect_failed_rows=False,
        rollback_on_validation_errors=False,
        **kwargs,
    ):
        """
        리스트에 항목들을 비동기적으로 추가하는 함수
        """
        items = []  
        item = []
        tasks = [] # asyncio.Task 객체를 담을 리스트
        headers = dataset.headers
        total = len(dataset)
        split_size = (total // self.Meta.async_size) + 1  # 비동기 실행을 위해 데이터를 쪼개는 양
        async for idx, data in enumerate_async(dataset, 1):
            is_final = idx == total  # 마지막 루프일때
            is_split_data = idx % split_size == 0  # 설정한 개수만큼 데이터를 쪼갠다. 
            item.append(data)
            if is_split_data or is_final:
                items.append(tablib.Dataset(*item, headers=headers))
                item = []
        for item in items:
            task = asyncio.create_task(
                sync_to_async(super().import_data)(
                    item,
                    dry_run,
                    raise_errors,
                    use_transactions,
                    collect_failed_rows,
                    rollback_on_validation_errors,
                    **kwargs,
                )
            )
            tasks.append(task)
        return await asyncio.gather(*tasks)
이제 해당 비동기 함수를 실행하고, 결과값을 모아주자.
import asyncio
def import_data(
        self,
        dataset,
        dry_run=False,
        raise_errors=False,
        use_transactions=None,
        collect_failed_rows=False,
        rollback_on_validation_errors=False,
        **kwargs,
    ):
        results = asyncio.run(
            self.async_import_data(
                dataset,
                dry_run,
                raise_errors,
                use_transactions,
                collect_failed_rows,
                rollback_on_validation_errors,
                **kwargs,
            )
        )
        result = self.get_result_class()()
        result.diff_headers = self.get_diff_headers()
        result.total_rows = len(dataset)
        # 비동기로 실행한 결과값들을 합치기 위한 로직
        for result_ in results:
            for key, value in result_.totals.items():
                result.totals[key] += value  # create, update, delete 한 개수를 알려주기 위해 넣는다.             
            result.invalid_rows += result_.invalid_rows 
            result.rows += result_.rows
        return result
코드를 보면 고칠게 몇개 보이는데, 해당 아이디어가 잘 안떠오른다. 예를 들면 dataset 을 다시 instance 로 만들어서 합치는게 불필요할 거 같은데, 당장 떠오르는 생각이 없어서 저렇게 했다.
'Django' 카테고리의 다른 글
| WebSocket 살펴보기 - 2 (0) | 2023.05.05 | 
|---|---|
| DRF - Null인 경우 마지막에 배치하도록 하기 (0) | 2023.04.17 | 
| E2E 테스트할 때 외부 API Mock으로 대체하기 (0) | 2023.03.16 | 
| Django Filterset 활용하기 (0) | 2023.03.13 | 
| Django staticfile 관리하기 (0) | 2023.03.01 |