Django

django-import-export 비동기 활용하기

r잡초처럼 2023. 4. 13. 23:14

django-import-export 를 활용해서 data 를 적재하고자 했다. 여기서 문제가 발생했는데 import 데이터가 5,000개를 넘어가니 꽤 오랜시간이 걸렸다. 약 8분 정도였다. 따라서 이 작업을 줄이고자 노력했다. use_bulk 나 batch_size, chunk_size 등 다양한 옵션을 활용했지만 도무지 시간이 줄어들지 않았다. 따라서 어떤 부분이 시간이 오래걸리나 살펴보다가 import_data 메소드에서 비동기를 활용해 데이터를 처리하면 어떨까? 라는 생각이 들어서 적용했다. 어느정도 성과가 있어서 비동기를 적용한 후에 3분 안팎으로 걸리게 수정했다.

 

사용한 비동기 함수는 2 가지다. aync iterator와 import_data 를 async 하게 바꿨다.

0. django-import-export 설정

 

class FooResources(resources.ModelResource):
    class Meta:
        ...
        skip_unchanged = True
        report_skipped = False
        use_bulk = True
        async_size = 10  # 비동기 실행 횟수 내가 커스텀한 설정
        batch_size = 1000
        chunk_size = 200
        store_row_values = True
        use_transactions = False
        ...

자세한 설정은 공식 문서를 살펴보자.

1. async for

async def enumerate_async(iterable, start_idx=0):
    for i, item in enumerate(iterable, start_idx):
        yield i, item

2. async import_data

 import asyncio
 async def async_import_data(
        self,
        dataset,
        dry_run=False,
        raise_errors=False,
        use_transactions=None,
        collect_failed_rows=False,
        rollback_on_validation_errors=False,
        **kwargs,
    ):
        """
        리스트에 항목들을 비동기적으로 추가하는 함수
        """
        items = []  
        item = []
        tasks = [] # asyncio.Task 객체를 담을 리스트
        headers = dataset.headers
        total = len(dataset)
        split_size = (total // self.Meta.async_size) + 1  # 비동기 실행을 위해 데이터를 쪼개는 양

        async for idx, data in enumerate_async(dataset, 1):
            is_final = idx == total  # 마지막 루프일때
            is_split_data = idx % split_size == 0  # 설정한 개수만큼 데이터를 쪼갠다. 
            item.append(data)
            if is_split_data or is_final:
                items.append(tablib.Dataset(*item, headers=headers))
                item = []

        for item in items:
            task = asyncio.create_task(
                sync_to_async(super().import_data)(
                    item,
                    dry_run,
                    raise_errors,
                    use_transactions,
                    collect_failed_rows,
                    rollback_on_validation_errors,
                    **kwargs,
                )
            )
            tasks.append(task)
        return await asyncio.gather(*tasks)

이제 해당 비동기 함수를 실행하고, 결과값을 모아주자.

import asyncio
def import_data(
        self,
        dataset,
        dry_run=False,
        raise_errors=False,
        use_transactions=None,
        collect_failed_rows=False,
        rollback_on_validation_errors=False,
        **kwargs,
    ):
        results = asyncio.run(
            self.async_import_data(
                dataset,
                dry_run,
                raise_errors,
                use_transactions,
                collect_failed_rows,
                rollback_on_validation_errors,
                **kwargs,
            )
        )
        result = self.get_result_class()()
        result.diff_headers = self.get_diff_headers()
        result.total_rows = len(dataset)
        # 비동기로 실행한 결과값들을 합치기 위한 로직
        for result_ in results:
            for key, value in result_.totals.items():
                result.totals[key] += value  # create, update, delete 한 개수를 알려주기 위해 넣는다.             
            result.invalid_rows += result_.invalid_rows 
            result.rows += result_.rows

        return result

코드를 보면 고칠게 몇개 보이는데, 해당 아이디어가 잘 안떠오른다. 예를 들면 dataset 을 다시 instance 로 만들어서 합치는게 불필요할 거 같은데, 당장 떠오르는 생각이 없어서 저렇게 했다.