asyncio 뽀개기 2 - Future의 활용

Image not Found

Future를 잘 활용하면 단순히 await 하는 용도보다 더 다양한 흐름 제어를 할 수 있습니다. 이전 포스트에서는 asyncio의 핵심 컴포넌트인 코루틴과 Eventloop을 소개했습니다. 이번 포스트에서는 Future를 만드는 방법, Callback을 등록해서 활용하는 방법 등을 살펴볼 예정입니다.


Future란?

Future는 Python에 국한된 개념이 아니라 비동기 프로그래밍에서 널리 사용되는 개념이며, 주로 비동기 연산의 결과를 저장하는 객체를 나타냅니다. Java나 Javascript에서 접해보셨다면 이미 익숙한 개념일 것 같습니다. 동기 함수라면 함수가 blocking 후 종료했을 때 결과를 반환하기 때문에 그 값을 사용하면 됩니다. 하지만 비동기 함수의 경우, 호출하는 곳에서 의도적으로 연산 종료를 기다리지 않는 이상 반환 값을 바로 받아볼 수 없습니다. 그래서 Future는 비동기 함수가 호출한 곳에 “지금은 반환 값이 없는데 나중에 이곳에 값을 채워줄게” 라는 목적으로 만들어서 주는 것입니다.

꼭 Future를 알아야할까?

공식 문서에서 설명한 것처럼, Future 객체를 외부에 노출하는 것은 권장되지 않기 때문에 고수준에서만 asyncio를 사용하면 Future를 만날 기회가 적습니다. 하지만 비동기 프로그래밍에서 중요한 개념이고, 저수준에서 asyncio를 다루려면 필수적으로 알아야 효율적인 코드를 작성할 수 있습니다.

Python에서의 Future

Python에는 사실 2개의 서로 다른 Future 클래스가 있습니다. 하나는 concurrent.futures.Future이고, 다른 하나는 asyncio.Future입니다. 이후에 등장하는 모든 Future는 후자임을 먼저 밝힙니다. 두 클래스는 서로 다른 시기에 다른 목적을 위해 만들어졌기 때문에 서로 호환되지 않습니다. 대신 asyncio.wrap_future()를 이용하면 전자를 후자로 바꿀 수 있습니다.

asyncio에서의 Future

위에서 설명한 것처럼 asyncio 모듈에서 Future가 직접 노출되는 부분은 적습니다. loop.create_future()asyncio.create_task()가 가장 흔한데, 후자는 Eventloop에 코루틴을 등록하는 역할을 하므로 실제로 조작하는 경우는 흔치 않습니다.

Future의 기능

독자의 편의를 위해 간략하게 특징/기능을 나열하고 넘어가겠습니다.

  • Awaitable이기 때문에 await 키워드로 결과를 기다릴 수 있음
  • done()으로 연산이 끝났는지 확인 가능
  • set_result()로 연산의 결과를 지정
  • cancel()로 연산을 취소 가능
  • add_done_callback()으로 콜백함수 등록 가능

Future 활용

활용 1. 콜백

이전 포스트의 요구사항에 몇 가지 조건을 추가 후 구현하면서 Future를 활용해보겠습니다.

요구사항

기존

  • 10초마다 Job이 생성되는데, 각 Job은 랜덤한 시간을 소요함
    • 이전 Job의 수행시간이 10초를 넘든 안 넘든 상관없이 10초마다 새로 생성되어야 함
  • Job들은 concurrent 하게 실행됨

새로 추가되는 항목

  • 각 Job이 끝났을 때 총 수행 시간을 출력함
    • Job이 성공/실패(예외 발생) 여부와 상관없이 출력돼야 함
구현 1. Job 수정

가장 간단한 방법은 Job이 직접 출력을 하도록 수정하는 것입니다.

import asyncio
import time
from random import randint

async def run_job() -> None:
    before = time.time()

    try:
      delay = randint(5, 15)
      await asyncio.sleep(delay)  # 5~15초 동안 잠자기
    finally:
      after = time.time()
      print(f'Job duration: {after - before} sec')

async def main() -> None:
    while True:
        asyncio.create_task(run_job())
        await asyncio.sleep(10)

asyncio.run(main())

위와 같은 간단한 예제에는 이렇게 구현해도 큰 무리가 없지만, 애플리케이션에 사용하기엔 문제가 있을 수 있습니다. 대부분은 이렇게 구현하면 SOLID의 단일 책임 원칙을 어기는 코드를 만들게 됩니다. 만약 이 예제 코드가 주기적으로 Job을 스케줄링하는 라이브러리라고 생각하면, 모니터링 기능을 위해서 사용자가 Job을 수정해야 하는 상황이 발생하는 것이죠.

구현 2. 콜백
import asyncio
import functools
import time
from random import randint

async def run_job() -> None:
    delay = randint(5, 15)
    await asyncio.sleep(delay)  # 5~15초 동안 잠자기

def print_wall_time(before: float, _: asyncio.Future[None]) -> None:
    after = time.time()
    print(f'Job duration: {after - before} sec')

async def main() -> None:
    while True:
        before = time.time()
        future = asyncio.create_task(run_job())
        future.add_done_callback(functools.partial(print_wall_time, before))
        await asyncio.sleep(10)

asyncio.run(main())

Future에서 제공하는 콜백기능을 이용해서 다시 구현했습니다. run_job()은 이전 포스트의 구현 그대로 변한 것이 없고, main()에 모니터링을 위해 콜백을 등록하는 코드가 추가됐습니다. 또한, 모니터링을 위한 로직이 하나의 함수 print_wall_time()으로 분리되었습니다. Future.add_done_callback()은 Future가 정상 종료, 예외 발생, cancel()을 통한 취소 등 종료만 된다면 항상 실행되기 때문에 try-finally 같은 코드가 없이도 목적을 달성할 수 있습니다.

Future는 콜백함수 파라미터로 객체 자신을 넘겨주기 때문에 원한다면 아래와 같이 Job의 결과까지 모니터링할 수 있습니다.

import asyncio
import functools
import time
from random import randint

async def run_job() -> int:
    delay = randint(5, 15)
    await asyncio.sleep(delay)  # 5~15초 동안 잠자기
    return randint(0, 1000)  # 임의의 연산 결과

def print_wall_time(before: float, future: asyncio.Future[int]) -> None:
    after = time.time()
    if future.exception() is not None:
        print(f'Error occurred: {future.exception()}, duration: {after - before} sec')
    else:
        print(f'Result: {future.result()}, duration: {after - before} sec')

async def main() -> None:
    while True:
        before = time.time()
        future = asyncio.create_task(run_job())
        future.add_done_callback(functools.partial(print_wall_time, before))
        await asyncio.sleep(10)

asyncio.run(main())
유의사항

그렇다면 Future에 추가된 콜백함수는 언제 어떤 순서로 호출될까요?
공식 문서를 봐도 그러한 명세는 찾을 수 없습니다. 그리고 Future는 Eventloop에서 자신에게 최적화된 구현체를 가질 수 있기 때문에 어떤 규칙을 보장받을 순 없을 것 같습니다. 다만 기본 구현체는 콜백 등록에 loop.call_soon() 메소드를 사용하고 있는데, Future 실행이 끝난 이후에 다음 Eventloop iteration에 콜백 함수 실행을 등록하며, 콜백함수가 등록된 순서대로 실행을 시작할 것 같습니다. 결론적으로 명세상 보장받을 수 없고, 순서나 시점을 보장받아야 하는 경우 별도의 장치가 필요합니다.

async 함수(코루틴 함수)를 콜백으로 등록하고 싶으면 어떻게 해야 할까요?
Future.add_done_callback()은 코루틴 함수가 아닌 일반 함수를 기대하기 때문에 코루틴 함수를 실행할 수는 없습니다. 대신 코루틴 실행을 Eventloop에 등록할 수는 있는데, Future.add_done_callback(lambda fut: asyncio.create_task(some_async_func(fut))) 같이 사용하시면 됩니다. asyncio.create_task()를 사용했기 때문에 당연하게도 코루틴이 종료되는 것을 await으로 기다릴 수 없습니다. 그 이유에 대해서 궁금하시면 이전 포스트를 참고해주세요.

활용 2. Future 발행

asyncio 모듈의 많은 유틸들이 (Queue, gather, as_completed 등) 내부적으로는 Future 기반으로 작성되어있습니다. 그중에서도 Semaphore를 간단하게 구현해보면서 Future의 또 다른 활용법을 살펴보겠습니다. 아래 인터페이스를 먼저 제시해 놓았기 때문에 먼저 직접 구현해보신 이후에 코드를 확인해보시길 추천해 드립니다.

인터페이스 & 테스트 코드
import asyncio
from abc import ABCMeta
from datetime import datetime

# 인터페이스
class Semaphore(metaclass=ABCMeta):
    _value: int

    def __init__(self, initial_value: int = 1) -> None:
        self._value = initial_value

    async def acquire(self) -> None:
        raise NotImplementedError

    def release(self) -> None:
        raise NotImplementedError

# 테스트 코드
async def run_job(sem: Semaphore, job_id: int) -> None:
    await sem.acquire()
    print(f'{datetime.now()} - start job {job_id}')
    await asyncio.sleep(1)
    print(f'{datetime.now()} - job {job_id} finished')
    sem.release()

async def main() -> None:
    sem = SomeSemaphore(2)  # 구현체로 변경 필요

    await asyncio.gather( # 한번에 2개의 Job 묶음이 1초 간격으로 실행되어야함
      run_job(sem, 1),
      run_job(sem, 2),
      run_job(sem, 3),
      run_job(sem, 4),
      run_job(sem, 5),
    )

asyncio.run(main())
구현 1. busy waiting

busy waiting 방식이 가장 쉽게 떠올릴 수 있는 구현일 것 같습니다.

class BusyWaitingSemaphore(Semaphore):
    async def acquire(self) -> None:
        while self._value <= 0:
            await asyncio.sleep(0.1)
        self._value -= 1

    def release(self) -> None:
        self._value += 1

당연하게도 글 제목인 Future가 코드에 없으므로 문제가 있는 코드라는 것을 알 수 있습니다. 이 코드는 비효율적인데, sleep을 사용해 주기적으로 release가 있었는지 확인하는 polling 기반이기 때문입니다. 운이 좋지 않으면 release가 호출됐더라도 0.1초 이후에야 acquire가 반환되는 상황이 발생할 수 있는 것이죠. 그러한 딜레이를 해결하기 위해서 sleep delay를 0.01로 바꾸면 polling 주기가 잦아지기 때문에 Eventloop에 오버헤드가 생기게 됩니다.

구현 2. Future 발행

Future를 사용하면 busy waiting 방식의 polling을 걷어내고 비동기적으로 훨씬 더 효율적인 Semaphore를 만들 수 있습니다. 지금까지는 asyncio 모듈이나 관련 라이브러리에서 반환 값으로 주는 Future 객체에 await을 붙이거나 콜백을 붙였다면, 이번에는 직접 Future 객체를 생성하고 관리하는 것입니다.

import asyncio
from collections import deque

class FutureSemaphore(Semaphore):
    _waiters: deque[asyncio.Future[None]]

    def __init__(self, initial_value: int = 1) -> None:
        super().__init__(initial_value)
        self._waiters = deque()

    async def acquire(self) -> None:
        if self._value <= 0:
            loop = asyncio.get_running_loop()
            future = loop.create_future()
            self._waiters.append(future)
            await future

        self._value -= 1

    def release(self) -> None:
        self._value += 1
        if len(self._waiters) > 0:
            fut = self._waiters.popleft()
            fut.set_result(None)

BusyWaitingSemaphore.acquire()에서 while loop이 Future를 사용하도록 변경됐습니다. 직접 비어있는 Future 객체를 만들고 await 키워드로 Future가 끝나길 기다리는 것이죠. 그렇다면 언제 해당 Future가 await에서 깨어나면 될까요? 누군가 release()를 호출해서 Semaphore에 공석이 생겼을 때 기다리고 있는 Future를 깨워주면 되는 것이죠. 그것을 위해서 acquire()에는 생성한 Future 객체를 _waiters 변수에 추가해서 기다리고 있다는 사실을 알리고, release()에서는 함수 끝에 기다리고 있는 Future가 있는지 찾아서 set_result()로 Future를 끝내는 것입니다.

실제로 asyncio.Semaphore 또한 유사한 방식으로 구현되어있습니다. (물론 위의 예제 코드로는 해결하지 못하는 복잡한 상황을 처리하기 위해 조금 더 복잡하긴 합니다.)

유의사항

Future 객체 생성 방식 공식 문서에 나와 있는 것처럼 asyncio.Future()와 같이 직접 객체를 생성하기보다, loop.create_future()로 생성해야 합니다. Eventloop이 자신에게 더 최적화된 구현체를 제공할 수도 있기 때문입니다.

예제 코드의 thread-safety 위의 예제 코드는 모두 thread-safe 하지 않지만, 싱글 스레드에서는 항상 안전합니다. 이것은 asyncio.Semaphore도 마찬가지인데, Eventloop은 하나의 스레드에서 실행되기 때문입니다. release()_value_waiters 수정이 atomic 하지 않아도 싱글 스레드에서 안전한 이유는, 이전 포스트의 Cooperative multitasking 부분을 참고해주세요.

마치며

이번 포스트에서는 Future를 단순히 await 하는 것 보다 조금 더 복잡한 용례를 살펴봤습니다. 다음 포스트에서는 Eventloop의 signal handling 방법을 살펴보면서 SIGINT, SIGTERM 같은 종료 시그널을 올바르게 처리하는 방법을 설명하겠습니다.


버즈빌 개발자 지원하기 (클릭)

버즈빌 테크 리크루터와 Coffee Chat하기 (클릭)

You May Also Like

post-thumb

배포를 빠르게 - DIY(Deploy It Yourself)

이 글에선 버즈빌에선 어떻게 배포의 속도를 높였는지 소개합니다. 배포를 우아하게 - 원-클릭 배포 배포를 안전하게 - 카나리 배포 전략, 롤백 배포를 빠르게 - DIY(Deploy It Yourself) 이전 글들에서 버즈빌의 배포 파이프라인의 세부적인 내용들을 설 …

Read Article
post-thumb

상품 추천 알고리즘 Item-CF의 최적화 여정

만 30개의 이커머스을 광고주로 삼고 있는 버즈빌. 이들을 위한 리타겟팅 광고 솔루션을 고도화하기 위해 도입된 상품 추천 알고리즘: Item-CF. 이 알고리즘 최적화와 적용 과정에 대해서 알아보겠습니다. 버즈빌의 리타겟팅 광고 솔루션 버즈빌에서는 매출의 20% …

Read Article
버즈빌, 아마도 당신이 원하던 회사!

지원하기