사용자 도구

사이트 도구


비동기_프로그래밍과_scala

비동기 프로그래밍과 Scala

비동기(asynchrony)는 어떤 곳에서도 사용될 수 있는, 병행성(concurrency)을 포괄하는 개념입니다. 이 글은 비동기 처리가 무엇인지, 어떤 문제를 해결해야 하는지에 관해 다룹니다. 1)

소개

비동기는 멀티스레딩(multithreading)보다 넓은 개념임에도 불구하고, 일부 개발자들은 비동기와 멀티스레딩을 혼동합니다. 비동기와 멀티스레딩의 관계는 다음과 같습니다.

Multithreading <: Asynchrony

비동기 연산은 다음과 같이 형(type)을 통해서 표현할 수도 있습니다.

type Async[A] = (Try[A] => Unit) => Unit

여러 개의 Unit 반환형(return type)이 더럽다고 느껴지는 이유는, 바로 비동기가 더럽기 때문입니다. 다음과 같은 특성을 갖는 모든 작업, 프로세스, 네트워크 상의 한 노드를 비동기 연산이라고 할 수 있습니다.

  1. (호출자의 입장에서) 프로그램의 메인 플로우 밖에서 실행되거나, 현재의 호출 스택에서 실행되지 않음
  2. 작업이 끝나고 나면 실행되는 콜백을 사용함
  3. 결과가 나올 것인지, 혹은 언제 나올 것인지 보장되지 않음

비동기가 병행성(concurrency)을 포괄하긴 하지만, 그것이 필연적으로 멀티스레딩을 포괄하는 것이 아님은 매우 중요합니다. 예를 들어, JavaScript에서는 대부분의 I/O 동작이 비동기적일 뿐만 아니라, 아주 무거운 비지니스 로직도 (setTimeout에 기반한 스케쥴링을 통해) 비동기적으로 처리될 수 있기 때문에, 인터페이스가 계속해서 응답할 수 있습니다. 2)

프로그램에 비동기를 도입하는 일은, 병행성 문제(concurrency problems)를 일으킵니다. 우리는 비동기 연산이 언제 끝날지 절대로 알 수 없어서, 동시에 작동한 여러 개의 비동기 연산의 결과를 모으는 일은 동기화(synchronization)를 필요로 하게 되기 때문입니다. 동기화 작업은 프로그램이 더 이상 동작의 순서에 의존하지 않도록 하는 작업이기도 한데, 순서로부터의 독립은 바로 비결정론적 알고리즘의 핵심 요소이기도 합니다.

비결정론적(nondeterministic) 알고리즘은 같은 입력에 대해서도 새로운 실행에서는 이전과 다르게 동작할 수 있는 알고리즘을 가리킨다. 결정론적(deterministic) 알고리즘과 반대 관계이다. … 병행(concurrent) 알고리즘은 경쟁 상태(race condition)로 인해 새로운 실행에서 이전과 다르게 동작할 수 있다. Nondeterministic algorithm, 영문 위키피디아

비결정론적 알고리즘

실력 있는 독자라면, 사용되는 방식과 환경에 따라서 조금씩의 차이는 있지만 이러한 종류의 문제가 어디에서나 발견된다는 사실을 눈치 챌 수 있습니다.

위의 모든 추상화들은 비동기를 처리하는 조금 나은 방법들입니다.

커다란 허상

비동기적인 결과를 동기적인 결과로 변환하는 함수는 흔히 아래와 같이 기술됩니다.

def await[A](fa: Async[A]): A

그러나 우리는 비동기 처리가 다른 일반적인 함수들과 같다고 가정해서는 안됩니다. 그 이유는 CORBA의 실패로부터도 배울 수 있습니다.

비동기 처리에 있어서 우리는 흔히 분산 컴퓨팅의 함정 (fallacies of distributed computing)에 빠지곤 합니다.

  1. 네트워크는 신뢰할 수 있다 (The network is reliable)
  2. 레이턴시는 존재하지 않는다 (Latency is zero)
  3. 대역폭은 무한하다 (Bandwidth is infinite)
  4. 네트워크의 보안은 완벽하다 (The network is secure)
  5. 네트워크 토폴로지는 변하지 않는다 (-Topology doesn't change)
  6. 관리자는 한 명 뿐이다 (There is one administrator)
  7. 전송 비용은 없다 (Transport cost is zero)
  8. 네트워크는 동질적이다 (The network is homogeneous)

당연히 위의 명제는 모두 거짓입니다. 다시 말해, 우리는 흔히 네트워크 상에서 발생한 오류, 레이턴시, 패킷 손실, 대역폭 제한을 충분히 고려하지 않은 코드를 작성한다는 의미입니다.

개발자들은 아래와 같은 방식으로 이러한 문제에 대처해 왔습니다.

이렇게 해결법이 많다는 것은, 그 중에 무엇도 일반적인 목적으로 비동기를 처리하기에 적합하지 않다는 것을 의미합니다. 메모리 관리와 비동기(asynchrony) 처리는 우리 소프트웨어 개발자들이 마주하고 있는 가장 큰 문제라는 점에서, '은 총알은 없다' 딜레마와도 관련이 있습니다.

원주 경고: 개인적인 의견과 불평입니다.

어떤 사람들은 Golang과 같은 M:N 플랫폼을 자랑합니다만, 저는 JVM이나 .Net과 같은 1:1 멀티스레딩 플랫폼을 좋아합니다.

프로그래밍 언어에 충분한 표현성(Scala의 Future와 Promise, Task, Closure의 core.async)이 있는 한, 우리는 1:1 멀티스레딩 플랫폼 위에 M:N 플랫폼도 구현할 수 있기 때문입니다. 하지만 M:N 런타임에서 M:N 멀티스레딩에 문제가 발생했을 때는, 플랫폼을 바꾸지 않고서는 문제를 해결할 수 없습니다. 그리고 실제로 대부분의 M:N 플랫폼은 자주 문제가 생깁니다.

모든 가능한 해결법을 배우고 결정을 내리는 것은 실로 엄청나게 고통스러운 작업이지만, 그래도 충분히 알지 못하고 결정을 내리는 것보다는 낫습니다. 이 문제에 한해서는 TOOWTDI 혹은 “나쁜 기능이 더 낫다”를 신념으로 갖지 않는 것이 좋습니다. Scala나 Haskell과 같은 표현적인(expressive) 신생 언어의 학습 난이도가 너무 높다는 불평은 문제의 요점과 떨어져 있습니다. 새로운 언어를 배우는 것은 병행성(concurreny)을 다루는 데에 있어서 가장 사소한 문제이기 때문입니다. 저는 심지어 병행성의 문제를 해결하지 못해 소프트웨어 업계를 떠난 사람들도 알고 있습니다.

콜백 지옥

예를 들어서, 두 개의 비동기 작업을 시작하고 그 결과들을 합친다고 합시다.

먼저 작업을 비동기적으로 실행하는 함수를 정의합니다.

import scala.concurrent.ExecutionContext.global
 
type Async[A] = (A => Unit) => Unit
 
def timesTwo(n: Int): Async[Int] =
  onFinish => {
    global.execute(new Runnable {
      def run(): Unit = {
        val result = n * 2
        onFinish(result)
      }
    })
  }
 
// 사용법
timesTwo(20) { result => println(s"결과: $result") }
//=> 결과: 40

연속 실행 (side-effect의 연옥)

하나의 실행이 끝나면 다음을 연속해서 실행하는 방법으로 다음과 같이 두 개의 비동기 결괏값을 합칠 수 있습니다.

def timesFour(n: Int): Async[Int] =
  onFinish => {
    timesTwo(n) { a =>
      timesTwo(n) { b =>
        // 두 개의 결과를 합친다
        onFinish(a + b)
      }
    }
  }
 
// 사용법
timesFour(20) { result => println(s"결과: $result") }
//=> 결과: 80

두 개의 결괏값을 합치는 정도는 간단합니다.

하지만 비동기성은 관련된 모든 것들에 영향을 미친다는 문제가 있습니다. 여러분이 다음과 같은 순수 함수를 만들었다고 합시다.

def timesFour(n: Int): Int = n * 4

하지만 EJB를 사용하려는 EA가 여러분에게 순수 함수가 아닌 비동기 timesTwo 함수를 사용하라고 지시합니다. 이제 여러분의 timesFour 구현은 순수한 수학적 함수에서 side-effect가 넘쳐나는 함수가 될 수밖에 없습니다. 충분히 좋은 Async형이 없다면 우리는 side-effect가 넘쳐나는 콜백으로 이루어진 작업 흐름을 다뤄야 합니다. 물론 결괏값이 나올 때까지 봉쇄(block)하는 것도 도움이 될 수 없습니다. 그저 문제를 숨기는 것일 뿐이니까요. 2장을 보면 이유를 알 수 있습니다.

게다가 문제는 점점 더 복잡해집니다. 😷

병렬 실행 (비결정론의 림보)

위의 '연속 실행'에서의 두 번째 실행은 첫 번째 실행에 의존하지 않습니다. 즉, 두 실행은 병렬적(parallel)으로 처리될 수 있습니다. JVM에서는 CPU-bound 작업3)들을 병렬적으로 실행할 수 있고, JavaScript에서도 Ajax 요청을 보내거나 Web Worker API를 사용할 때 병렬 처리가 가능합니다.

안타깝게도 병렬 실행에서는 일이 조금 복잡해집니다. 아래와 같은 순진한 접근 방법은 완전히 틀렸습니다.

// 나쁜 예시
def timesFourInParallel(n: Int): Async[Int] =
  onFinish => {
    var cacheA = 0
 
    timesTwo(n) { a => cacheA = a }
 
    timesTwo(n) { b =>
      // 두 개의 결과를 합친다
      onFinish(cacheA + b)
    }
  }
 
timesFourInParallel(20) { result => println(s"결과: $result") }
//=> 결과: 80
 
timesFourInParallel(20) { result => println(s"결과: $result") }
//=> 결과: 40

바로 이것이 비결정론의 실제 예시입니다. 두 개의 작업이 종료되는 순서가 보장되지 않기 때문에 우리는 동기화를 수행하는 소규모의 상태 기계(state machine)를 구현해야 합니다.

먼저 상태 기계의 ADT를 정의합니다.

// 상태 기계 정의
sealed trait State
// 초기 상태
case object Start extends State
// B가 종료되어 A를 대기하는 상태
final case class WaitForA(b: Int) extends State
// A가 종료되어 B를 대기하는 상태
final case class WaitForB(a: Int) extends State

다음으로, 상태 기계를 비동기적으로 구현합니다.

// JVM에서는 나쁜 예시 (JavaScript에서는 돌아감)
def timesFourInParallel(n: Int): Async[Int] = {
  onFinish => {
    var state: State = Start
 
    timesTwo(n) { a =>
      state match {
        case Start =>
          state = WaitForB(a)
        case WaitForA(b) =>
          onFinish(a + b)
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
    }
 
    timesTwo(n) { b =>
      state match {
        case Start =>
          state = WaitForA(b)
        case WaitForB(a) =>
          onFinish(a + b)
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
    }
  }
}

위의 해결법을 시각화하면 아래와 같은 상태 기계로 나타낼 수 있습니다.

하지만 아직 문제가 남아있습니다. JVM은 1:1 멀티스레딩 플랫폼이기 때문에 공유 메모리에 병행적으로 접근(shared memory concurrency)하게 됩니다. 따라서 state에 대한 접근이 동기화되어야 합니다.

사용할 수 있는 한 가지 해결 방법은 synchronized 블록을 사용하는 것입니다. 이 방법은 '암묵적인 락(intrinsic lock)'이라고도 부릅니다.

// 모니터처럼 작동할 수 있는 공통 참조 생성
val lock = new AnyRef
var state: State = Start
 
timesTwo(n) { a =>
  lock.synchronized {
    state match {
      case Start =>
        state = WaitForB(a)
      case WaitForA(b) =>
        onFinish(a + b)
      case WaitForB(_) =>
        // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
        throw new IllegalStateException(state.toString)
    }
  }
}
 
//...

이러한 고수준의 락(high-level lock)은 여러 개의 스레드가 병렬적(parallel)으로 (위의 state와 같은) 리소스에 접근하지 못하도록 보호합니다. 하지만 저는 개인적으로 고수준 락의 사용을 피합니다. 커널의 스케쥴러는 임의의 스레드를 임의의 이유로 정지시킬 수 있는데, 락을 갖고 있는 스레드가 커널에 의해 정지되면 다른 모든 스레드가 정지되기 때문입니다. 정지를 피하고 끊임 없이 작업을 실행하기 위해서는 비봉쇄 알고리즘(non-blocking logic)이 더 낫습니다.

그래서 이 경우에는 AtomicReference를 사용하는 것이 정답입니다.

// JVM에서 잘 돌아가는 예시
 
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
 
def timesFourInParallel(n: Int): Async[Int] = {
  onFinish => {
    val state = new AtomicReference[State](Start)
 
    @tailrec def onValueA(a: Int): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForB(a)))
            onValueA(a) // 재시도
        case WaitForA(b) =>
          onFinish(a + b)
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    timesTwo(n)(onValueA)
 
    @tailrec def onValueB(b: Int): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForA(b)))
            onValueB(b) // 재시도
        case WaitForB(a) =>
          onFinish(a + b)
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    timesTwo(n)(onValueB)
  }
}
원주 MonixAtomic형을 사용하면 코드를 JavaScript / Scala.js로 크로스컴파일 하거나, 성능 향상을 위한 도구와 AtomicReference와 관련된 훌륭한 유틸리티들을 사용할 수 있습니다.

이제 조금 어려워졌나요? 조금 더 힘내봅시다.

재귀 실행 (분노의 StackOverflow)

위의 onFinish의 호출이 스택-안전하지 않고(stack-unsafe), 호출 시점에서 비동기 범위(asynchronous boundary)를 강제하지 않으면 프로그램이 StackOverflowError로 터질 수 있다고 한다면, 어떤 생각이 드시나요? 4)

말 그대로의 의미입니다. 앞서 작성한 코드를 제네릭한 방법(generic way)으로 다시 작성해 봅시다.

import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
 
type Async[+A] = (A => Unit) => Unit
 
def mapBoth[A,B,R](fa: Async[A], fb: Async[B])(f: (A,B) => R): Async[R] = {
  // 상태 기계 정의
  sealed trait State[+A,+B]
  // 초기 상태
  case object Start extends State[Nothing, Nothing]
  // B가 종료되어 A를 대기하는 상태
  final case class WaitForA[+B](b: B) extends State[Nothing,B]
  // A가 종료되어 B를 대기하는 상태
  final case class WaitForB[+A](a: A) extends State[A,Nothing]
 
  onFinish => {
    val state = new AtomicReference[State[A,B]](Start)
 
    @tailrec def onValueA(a: A): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForB(a)))
            onValueA(a) // 재시도
        case WaitForA(b) =>
          onFinish(f(a,b))
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    @tailrec def onValueB(b: B): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForA(b)))
            onValueB(b) // retry
        case WaitForB(a) =>
          onFinish(f(a,b))
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    fa(onValueA)
    fb(onValueB)
  }
}

그리고 Scala의 Future.sequence와 유사한 기능을 구현합시다. 우리의 의지는 강인하고 용기는 무한하니까요. 😇

def sequence[A](list: List[Async[A]]): Async[List[A]] = {
  @tailrec def loop(list: List[Async[A]], acc: Async[List[A]]): Async[List[A]] =
    list match {
      case Nil =>
        onFinish => acc(r => onFinish(r.reverse))
      case x :: xs =>
        val update = mapBoth(x, acc)(_ :: _)
        loop(xs, update)
    }
 
  val empty: Async[List[A]] = _(Nil)
  loop(list, empty)
}
 
// 호출
sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))) { r =>
  println(s"결과: $r")
}
//=> 결과: List(20, 40, 60)

이것으로 끝났을까요? 아닙니다.

val list = 0.until(10000).map(timesTwo).toList
sequence(list)(r => println(s"합: ${r.sum}"))

자, 위의 코드를 실행시키고 영광스러운 메모리 오류를 목도하십시오. 프로덕션 환경이었다면 아래의 오류는 Scala의 NonFatal로 처리되지도 않아서 프로그램이 크래시를 일으켰을 겁니다.

java.lang.StackOverflowError
  at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2414)
  at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2630)
  at scala.concurrent.impl.ExecutionContextImpl$$anon$3.execute(ExecutionContextImpl.scala:131)
  at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:20)
  at .$anonfun$timesTwo$1(<pastie>:27)
  at .$anonfun$timesTwo$1$adapted(<pastie>:26)
  at .$anonfun$mapBoth$1(<pastie>:66)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)

앞서 말씀드린 것과 같이, 강제된 비동기 범위 없이 실행된 onFinish는 스택 오버플로우를 유발할 수 있습니다. JavaScript에서는 setTimeout을 사용한 스케쥴링으로 문제를 해결할 수 있으며, JVM에서는 스레드 풀을 사용하거나 Scala의 ExecutionContext를 사용할 수 있습니다. 5)

Future와 Promise

scala.concurrent.Future는 earger evaluation이 적용되는 비동기 연산을 기술하는 방법입니다. 지금까지 위에서 사용한 Async형과도 유사합니다.

Future와 Promise는 일부 병행 프로그래밍 언어(concurrent programming language)에서 프로그램의 실행을 동기화하기 위해서 사용하는 생성자이다. Future와 Promise가 기술하는 객체는 연산이 아직 완료되지 않아 초기에는 계산할 수 없는 값에 대한 대리자(proxy)이다. Futures and promises, 영문 위키피디아

원주 필자의 불평:

Future와 Promise에 관한 docs.scala-lang.org 문서의 다음 설명은 오해를 불러일으키기 쉽고 오개념의 원인이 됩니다.

“Future는 여러 동작을 병렬적(paraell)으로 수행하는 방법을 제공합니다. 이러한 방법은 효율적이며 비봉쇄적(non-blocking)입니다.”

Future형은 비동기(asynchrony)를 기술하는 것이지, 병렬성(parallelism)을 기술하는 것이 아닙니다. 물론 Future를 사용해서 병렬처리를 하는 것도 가능하지만, 그렇다고 그것이 병렬처리만을 할 수 있다는 뜻은 아닙니다. 게다가 CPU의 성능을 최대로 끌어올릴 수 있는 방법을 찾는 사람들에게 있어서 Future는 비용이 크고 현명하지 못한 선택입니다. (4.4절 참고)

Future는 다음과 같이 두 개의 주요 연산(primary operation)과, 주요 연산을 기반으로 정의되는 여러 함수 콤비네이터들로 정의되는 인터페이스입니다.

import scala.util.Try
import scala.concurrent.ExecutionContext
 
trait Future[+T] {
  // abstract
  def value: Option[Try[T]]
 
  // abstract
  def onComplete(f: Try[T] => Unit)(implicit ec: ExecutionContext): Unit
 
  // 값 변형
  def map[U](f: T => U)(implicit ec: ExecutionContext): Future[U] = ???
 
  // 연속 실행 ;-)
  def flatMap[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[U] = ???
 
  // ...
}

Future의 특성은 다음과 같습니다.

  • Eager evaluation.6) 함수의 호출자가 Future의 참조를 전달 받으면, 작업 내용과 무관하게 비동기 작업은 이미 시작된 상태입니다.
  • 메모아이제이션 (캐싱). Eager evaluation이 이루어진다는 것은, 함수가 아닌 일반적인 변수와 같이 동작하며, 결괏값은 모든 소비자(listener)에서 사용할 수 있어야 함을 의미합니다. value 속성은 결괏값을 메모아이즈하기 위해 존재합니다. 아직 연산이 완료되지 않은 경우에는 None이 할당됩니다. 당연한 이야기지만, value 속성의 def value를 호출하면 비결정론적인 결과를 얻습니다.
  • 단일한 값을 흘려보내고(stream) 나타냅니다(show). 메모아이제이션이 적용되었기 때문입니다. 따라서 작업 완료에 대한 소비자(listener)는 최대 한 번까지만 호출됩니다.

다음은 ExecutionContext에 대한 부연설명입니다.

  • ExecutionContext는 비동기 실행을 관리합니다. 스레드 풀과 유사하다고 할 수 있겠지만, 반드시 스레드 풀의 역할만을 하는 것은 아닙니다. (비동기는 멀티스레딩이나 병렬성과 다르니까요)
  • onComplete는 기본적으로 우리가 위에서 정의한 Async형과 유사하지만, ExecutionContext를 인수로 받습니다. 모든 완료 콜백은 비동기적으로 호출되어야 하기 때문입니다.
  • 모든 콤비네이터와 유틸리티들은 onComplete를 기반으로 구현되었기 때문에 ExecutionContext를 인수로 받습니다.

왜 모든 시그니처에 ExecutionContext를 붙여야 하는지 아직 이해하지 못하겠다면 3.3절을 다시 읽어보세요.

연속 실행

3장에서 만든 함수를 Future로 다시 정의해 봅시다.

import scala.concurrent.{Future, ExecutionContext}
 
def timesTwo(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  Future(n * 2)
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  timesTwo(20).onComplete { result => println(s"결과: $result") }
  //=> 결과: Success(40)
}

아주 쉽습니다. Future.apply 메서드는 주어진 ExecutionContext에서 주어진 연산을 수행합니다. JVM에서 global 컨텍스트를 선택했으므로, 연산은 별개의 스레드에서 실행됩니다.

3.1절의 연속 실행은 다음과 같이 구현할 수 있습니다.

def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  timesFour(20).onComplete { result => println(s"결과: $result") }
  //=> 결과: Success(80)
}

역시 아주 쉬워졌습니다. 위의 for-comprehension은 아래와 같이 flatMapmap으로 풀어서 쓸 수도 있습니다.

def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  timesTwo(n).flatMap { a =>
    timesTwo(n).map { b =>
      a + b
    }
  }

프로젝트에서 scala-async를 사용하면 다음과 같이 할 수도 있습니다.

import scala.async.Async.{async, await}
 
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  async {
    val a = await(timesTwo(a))
    val b = await(timesTwo(b))
    a + b
  }

scala-async 라이브러리는 매크로를 이용해서 여러분의 코드를 flatMapmap을 이용하는 코드로 변환합니다. 다시 말해서, await는 스레드를 봉쇄(block)하지 않습니다. 그럴 것 같은 인상을 주더라도 말이죠.

scala-async는 정말로 훌륭해 보이지만, 안타깝게도 여러가지 한계가 있습니다. 예를 들어서 익명 함수 안에 있는 await는 변환되지 않습니다. 아쉽지만 Scala 코드는 보통 그런 표현식으로 넘쳐납니다. 다음과 같은 코드도 작동하지 않습니다.

// 나쁜 예시
def sum(list: List[Future[Int]])(implicit ec; ExecutionContext): Future[Int] =
  async {
    var sum = 0
    // for는 foreach로 변환되기 때문에 작동하지 않습니다
    for (f <- list) {
      sum += await(f)
    }
  }

scala-async의 접근법은 마치 1급 지속을 구현한 것과 같은 착각을 일으킵니다. 하지만 그러한 지속은 안타깝게도 1급이 아니고, 그저 컴파일러에 의해서 관리되는 코드 재작성일 뿐입니다. 그리고 이러한 한계는 C#과 ECMAScript에도 똑같이 적용됩니다. 그리고 이것은 안타깝게도 함수형 프로그래밍에서 async 키워드가 그다지 많이 쓰일 수 없음을 의미합니다.

'은 총알은 없다'는 저의 불평이 다시 떠오른다면 좋겠습니다. 😞

병렬 실행

3.2절에서와 마찬가지로 두 번의 함수 호출은 서로에 대해 독립적이기 때문에 병렬적으로 실행할 수 있습니다. 초보자에게는 계산 구문(evaluation semantics)이 조금 난해할 수 있지만, Future를 사용하면 이 작업은 더 쉬워집니다.

def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = {
  // Future는 eager evaluation을 수행하므로, 둘 모두 값을 합치기 전에 실행됩니다
  val fa = timesTwo(n)
  val fb = timesTwo(n)
 
  for (a <- fa; b <- fb) yield a + b
  // fa.flatMap(a => fb.map(b => a + b))
}

이러한 방법은 초보자에게는 조금 헷갈릴 수 있습니다. 작업이 병렬적(parallel)으로 실행되도록 하기 위해서는 간단히 결괏값을 합치기 이전에 Future를 생성하면 됩니다.

다른 방법으로, 임의의 컬렉션에 대해 Future.sequence를 사용할 수도 있습니다.

def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  Future.sequence(timesTwo(n) :: timesTwo(n) :: Nil).map(_.sum)

이 방법도 초보자에게는 조금 충격적일 수도 있습니다. 여기서 Future들은 그 컬렉션이 '엄격하게' 정의되었을 때에만 병렬적(parallel)으로 실행되기 때문입니다. 이때 '엄격하게'라는 말은 Scala의 Stream이나 Iterator 등으로 정의되지 않았음을 의미합니다. 이러한 표현이 초보자에게는 불명확한 탓도 있을 듯합니다.

재귀 실행

Future형은 재귀 동작에 대해서 완전히 안전합니다. (콜백 호출을 ExecutionContext에 의존하고 있기 때문입니다.) 3.3절의 예시를 다시 작성해 봅시다.

def mapBoth[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R)
  (implicit ec: ExecutionContext): Future[R] = {
 
  for (a <- fa; b <- fb) yield f(a,b)
}
 
def sequence[A](list: List[Future[A]])
  (implicit ec: ExecutionContext): Future[List[A]] = {
 
  val seed = Future.successful(List.empty[A])
  list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l)
    .map(_.reverse)
}
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println)
  // => List(20, 40, 60)
}

이번에는 StackOverflowError가 발생하지 않습니다.

val list = 0.until(10000).map(timesTwo).toList
sequence(list).foreach(r => println(s"합: ${r.sum}"))
//=> 합: 99990000

성능 고려사항

Future의 문제점은 onComplete를 호출할 때마다 ExecutionContext가 사용된다는 점입니다. 이러한 동작은 일반적으로 Runnable이 스레드 풀로 전달하는 것이고, 논리 스레드를 포크(fork)하게 됩니다. CPU-bound 작업이 많다면 Future의 구현은 성능에 재앙과도 같습니다. 스레드를 이동하는 동작은 문맥 교환(context switch)을 발생시키고 CPU 캐시의 참조 국소성(CPU cache locality)을 파괴하기 때문입니다. 물론 Future의 구현은 어느정도의 최적화를 수행하긴 합니다. 내부 콜백으로 인해서 새 스레드가 포크되지 않도록 flatMap은 내부 실행 맥락(internal execution context)을 사용합니다. 하지만 그러한 최적화로는 부족하고, 벤치마크 결과는 거짓말을 하지 않습니다.

또 메모아이제이션이 이루어진다는 것은, 작업이 완료되었을 때 생산자(producer)와 소비자(listener)에서 각각 한 번 씩 AtomicReference.compareAndSet이 호출된다는 의미입니다. 메모아이제이션이 멀티스레드 환경에서 잘 작동하도록 하는 이 호출들은 꽤 많은 비용이 듭니다.

다시 말해서, 여러분이 CPU의 성능을 최대로 끌어올려서 CPU-bound 작업을 수행하려고 한다면 FuturePromise를 사용하는 것은 그다지 좋은 생각이 아닙니다.

Scala의 FutureTask 구현의 성능 비교는, 다음의 최근 벤치마크 결과로 볼 수 있습니다.

[info] Benchmark                   (size)   Mode  Cnt     Score     Error  Units
[info] FlatMap.fs2Apply             10000  thrpt   20   291.459 ±   6.321  ops/s
[info] FlatMap.fs2Delay             10000  thrpt   20  2606.864 ±  26.442  ops/s
[info] FlatMap.fs2Now               10000  thrpt   20  3867.300 ± 541.241  ops/s
[info] FlatMap.futureApply          10000  thrpt   20   212.691 ±   9.508  ops/s
[info] FlatMap.futureSuccessful     10000  thrpt   20   418.736 ±  29.121  ops/s
[info] FlatMap.futureTrampolineEc   10000  thrpt   20   423.647 ±   8.543  ops/s
[info] FlatMap.monixApply           10000  thrpt   20   399.916 ±  15.858  ops/s
[info] FlatMap.monixDelay           10000  thrpt   20  4994.156 ±  40.014  ops/s
[info] FlatMap.monixNow             10000  thrpt   20  6253.182 ±  53.388  ops/s
[info] FlatMap.scalazApply          10000  thrpt   20   188.387 ±   2.989  ops/s
[info] FlatMap.scalazDelay          10000  thrpt   20  1794.680 ±  24.173  ops/s
[info] FlatMap.scalazNow            10000  thrpt   20  2041.300 ± 128.729  ops/s

CPU-bound 작업은 Monix Task가 Scala의 Future에 비해 압도적인 성능을 보여주고 있습니다.

원주 이 벤치마크 결과에는 한계가 있습니다. 여전히 Future가 더 빠른 경우도 있고 (Monix의 Observer는 몇 가지 합당한 이유로 인해 Future를 사용하고 있습니다), 애초에 CPU-bound 작업의 성능이 크게 중요하지 않은 경우도 자주 있습니다. 예를 들어서 I/O 작업을 할 때는 처리량과 CPU 속도가 별 상관이 없습니다.

Task와 Scala의 IO 모나드

Task는 lazy하고 비동기적으로 처리될 가능성이 있는 연산을 제어하기 위한 형입니다. Task는 특히 사이드 이펙트를 제어하고 비결정론과 콜백 지옥을 방지하는 데 유용합니다.

MonixTask 구현은 매우 정교합니다. Scalaz의 Task로부터 많은 영향을 받았기 때문에 같은 개념으로 개발되었지만, 그 구현은 다릅니다.

Task형은 Haskell의 IO 모나드로부터도 영향을 받았습니다. 필자는 Task가 Scala에서의 IO형이라고 생각합니다. 다만 이 방식이 좋은 것인지에 대해서는 논란이 있습니다.

Scalaz은 동기 작업만을 위한 별도의 IO형을 갖고 있습니다. 즉 Scalaz의 IO형은 비동기가 아니기 때문에 JVM에서 어떻게든 별도로 비동기 연산을 처리할 필요가 생깁니다. 반면 Haskell에서는 Async형이 있고, 이 Async형은 IO형으로 변환되기 때문에 런타임에 관리될 수 있습니다. (이때 그린스레드 등이 사용됩니다.)

JVM를 위한 Scalaz의 구현을 따르면, 비동기 연산을 IO형으로 나타낼 방법이 없습니다. 또 Scalaz의 구현에서는 스레드를 봉쇄(block)하지 않고도 비동기 연산을 처리할 방법이 없습니다. 그러나 스레드 봉쇄(block)는 오류를 일으키기 쉬우므로 가급적 피하는 것이 좋습니다.

요약하자면 Task형은 다음과 같은 특성이 있습니다.

  • Lazy하고 비동기적인 연산을 표현합니다.
  • 하나 혹은 여러 개의 소비자(consumer)에게 오직 하나의 값만을 전달하는 생산자(producer)를 표현합니다.
  • Lazy evaluation을 수행하기 때문에, Future와 달리 runAsync가 호출되기 전까지 어떤 실행이나 사이드 이펙트도 발생시키지 않습니다.
  • 메모아이제이션이 가능합니다.
  • 반드시 다른 논리 스레드에서 실행되어야 하는 것은 아닙니다.

특히 Monix의 구현에서 Task형은

  • 연산을 도중에 취소할 수 있습니다.
  • 그 어떤 스레드도 절대 봉쇄(block)하지 않습니다.
  • 스레드를 봉쇄(block)할 가능성이 있는 그 어떤 API도 호출하지 않습니다.
  • 모든 비동기 작업은 스택-안전(stack-safe)합니다.

Monix의 구현에서 Task형은 다음과 같은 표로 정리할 수도 있습니다.

Eager Lazy
동기 작업 A () ⇒ A
Coeval[A], IO[A]
비동기 작업 (A ⇒ Unit) ⇒ Unit (A ⇒ Unit) ⇒ Unit
Future[A] Task[A]

연속 실행

3장에서 만든 함수를 Task를 사용해서 다시 작성해 봅시다.

import monix.eval.Task
 
def timesTwo(n: Int): Task[Int] =
  Task(n * 2)
 
// 사용법
{
  // 계산되는 시점에 ExecutionContext가 필요합니다
  import monix.execution.Scheduler.Implicits.global
 
  timesTwo(20).foreach { result => println(s"결과: $result") }
  //=> 결과: 40
}

이 코드는 Future를 사용한 4.1절의 코드와 거의 똑같아 보입니다. 유일한 차이점은 timesTwo가 더이상 ExecutionContext를 인수로 받지 않는다는 점입니다. 이것은 Task 참조가 마치 함수처럼 lazy하게 계산되기 때문입니다. 실제 계산이 일어나도록하는 foreach가 호출되면 그때 비로소 결과가 출력됩니다. 그리고 바로 그때 Scheduler가 필요해집니다. Scheduler는 Monix의 향상된 ExecutionContext입니다.

이제 3.1절에서 했던 것처럼 해 봅시다.

def timesFour(n: Int): Task[Int] =
  for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b
 
// Usage
{
  // 계산되는 시점에 ExecutionContext가 필요합니다
  import monix.execution.Scheduler.Implicits.global
 
  timesFour(20).foreach { result => println(s"결과: $result") }
  //=> 결과: 80
}

Future형과 마찬가지로 for-comprehension은 Scala 컴파일러에 의해서 아래와 같은 flatMapmap에 대한 호출로 변환됩니다.

def timesFour(n: Int): Task[Int] =
  timesTwo(n).flatMap { a =>
    timesTwo(n).map { b => a + b }
  }

병렬 실행

TaskFuture보다 더 수월하게 병렬성을 구현할 수 있습니다. Task는 작업을 포크할 때 더 세밀한 제어를 가능하게 할 뿐만 아니라, flatMapmap 등의 변형을 현재 스레드의 현재 호출 스택에서 수행하기 때문입니다. 이를 통해 캐시 국소성(cache locality)을 유지하고 문맥 교환(context switch)을 피할 수 있습니다.

그러나 다음과 같이 단순히 FutureTask로 바꾸는 것만으로는 부족합니다.

// 나쁜 예 (이 코드는 여전히 연속 실행됩니다)
def timesFour(n: Int): Task[Int] = {
  // Task는 lazy하게 계산되므로 여기서는 아직 계산되지 않습니다.
  val fa = timesTwo(n)
  val fb = timesTwo(n)
  // Lazy evaluation으로 인해 값이 연속으로 계산됩니다.
  for (a <- fa; b <- fb) yield a + b
}

Task를 사용한 병렬 실행은 다음과 같이 명시적으로 나타내야 합니다.

def timesFour(n: Int): Task[Int] =
  Task.mapBoth(timesTwo(n), timesTwo(n))(_ + _)

mapBoth가 왠지 익숙하지 않나요? 이렇게 하면 mapBoth가 두 작업을 동시에 시작하기 때문에 두 개의 스레드가 포크되면서 병렬적으로 값이 계산됩니다.

재귀 실행

flatMap에 대해 Task는 재귀적이면서 스택-안전하도록 설계되었습니다. 내부적으로는 트램폴린 패턴을 사용해서 매우 효율적이기도 합니다. Task는 Rúnar Bjarnason의 논문 <Stackless Scala with Free Monads>을 바탕으로 구현되었습니다.

Task를 사용한 sequence 구현은 4.3절Future를 사용한 구현과 유사하지만, 아래의 sequence는 시그니처가 lazy evaluation을 나타내고 있습니다.

def sequence[A](list: List[Task[A]]): Task[List[A]] = {
  val seed = Task.now(List.empty[A])
  list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l)
    .map(_.reverse)
}
 
// 사용법
{
  // 계산되는 시점에 ExecutionContext가 필요합니다
  import monix.execution.Scheduler.Implicits.global
 
  sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println)
  // => List(20, 40, 60)
}

7)

함수형 프로그래밍과 타입 클래스

map, flatMap, mapBoth 등의 함수들을 사용할 때 우리는 더이상 “(A => Unit) => Unit“과 같은 내부 구조를 신경쓰지 않아도 됩니다. 그러한 함수들은 규칙적(lawful)이고, 순수함수적이고, 참조-투명하기 때문입니다. 이는 우리가 함수와 함수의 결과에 대해 생각할 때 그 함수가 사용되는 환경을 신경쓰지 않아도 됨을 의미합니다.

이것이 바로 Hakell의 IO가 대단한 이유입니다. Haksell은 사이드 이펙트를 '흉내내지' 않습니다. IO 값을 반환하는 함수들은 순수함수적이고 사이드이펙트를 프로그램의 주변부로 밀어냅니다. 이것은 Task에 대해서도 마찬가지입니다. Future도 eager evaluation으로 인해 조금 복잡해지긴 하지만 어쨌든 나쁘지 않은 선택입니다.

그렇다면 Task, Future, Coeval, Eval, IO, Id, Observable 등의 형을 추상화하는 인터페이스를 만들 수는 없을까요?

가능합니다. 우리는 이미 순차 실행을 추상화하는 flatMap을 보았고, 병렬 실행을 추상화하는 mapBoth를 보았습니다. 하지만 우리는 그러한 기능들을 고전적인 OOP 인터페이스로는 구현할 수 없습니다. 한 가지 이유는, Function1 인자에 대한 공변성과 반변성의 법칙(covariance and contravariance rules)에 따라 flatMap에서 형에 대한 정보가 소실되기 때문입니다. (F-바운드 다형성 타입을 사용하면 소실을 막을 수 있지만, 이 기능은 구현 재사용에 더 적합할 뿐만 아니라 다른 OOP 언어는 지원하지도 않습니다.) 그리고 그보다 더 근본적으로, OOP에서는 데이터 생성자를 메서드로 표현할 수 없기 때문입니다. (즉, OOP의 서브타입은 클래스 전체가 아니라 각각의 인스턴스에 적용되기 때문입니다.)

마침 Scala는 상류 타입(higher kinded types)을 지원하는 몇 안되는 언어에 속하고 타입 클래스를 지원하기 때문에 Haskell의 기능을 그대로 옮겨오는 것이 가능합니다. 😄 8)

원주 저자의 분노: Monad, Applicative, Functor 등의 단어를 공포스럽게 생각하거나 현실과 동떨어진 “학문적인” 개념으로 치부하는 사람이 많은 탓에 많은 저자들이 저 단어들을 사용하지 않기 위해 노력합니다. 이것은 Scala의 API 문서와 공식 튜토리얼도 마찬가지입니다.

하지만 그러한 설명 방식은 Scala와 사용자 모두에게 민폐입니다. 다른 언어에서 저 개념들은 단순히 설명하기 어려운 디자인 패턴에 불과합니다. 대부분의 다른 언어들은 형에 대한 표현성이 부족하기 때문입니다. 저 개념들을 표현할 수 있는 언어는 손에 꼽습니다. 언어 사용자의 입장에서도 문제가 생겼을 때 저 개념들을 모른 채 관련된 자료를 검색하는 것은 매우 고통스러운 일입니다.

또 저는 이것이 모르는 것에 대한 본능적인 공포에서 나오는 일종의 반지성주의라고 생각합니다. 예를 들어서 Java의 Optional형은 함자 법칙(functor law)에 위배됩니다. (즉, opt.map(f).map(g) != opt.map(f andThen g)입니다.) Swift에서는 어이없게도 5 == Some(5)입니다. Some(null)도 이상하지 않습니다. nullAnyRef의 유효한 값이기 때문입니다. 그렇지 않았더라면 Applicative[Option]를 정의하는 것이 불가능했을 것입니다. 이 예시들을 어떻게 반지성주의자들에게 설명할 수 있겠습니까.

Monad (연속 실행과 재귀 실행)

이 글의 목적은 모나드에 대해 설명하는 것이 아닙니다. 모나드에 관해서는 다른 좋은 글들이 있습니다. 모나드는 잘 모르지만 비동기 프로그래밍에 대한 감을 쌓기 위해서 이 글을 읽고 있다면 적어도 알아두어야 할 것이 있습니다. FutureTask 등의 자료형을 사용함에 있어서 모나드는 실행의 순서를 올바르게 표현하는 유일한 방법이라는 사실입니다.

관찰 결과: 명령형 언어(imperative language)로 병행성(concurrency)을 다루는 프로그래머들은 ”;”이 순서를 정의한다는 맹목적인 믿음으로 인해 함정에 빠지고 만다. Aleksey Shipilëv

Monad형을 Scala로 표현해 봅시다.

// 이 줄을 적지 않아도 된다면 좋을텐데 말이죠 :-(
import scala.language.higherKinds
 
trait Monad[F[_]] {
  /** 생성자 (`A`를 `F[A]` Monad로 리프팅합니다.)
    * `Applicative`의 일부이기도 합니다. 아래를 보세요.
    */
  def pure[A](a: A): F[A]
 
  /** 빰 */
  def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B]
}

이어서 Future를 구현합시다.

import scala.concurrent._
 
// Supplying an instance for Future isn't clean, ExecutionContext needed
class FutureMonad(implicit ec: ExecutionContext)
  extends Monad[Future] {
 
  def pure[A](a: A): Future[A] =
    Future.successful(a)
 
  def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] =
    fa.flatMap(f)
}
 
object FutureMonad {
  implicit def instance(implicit ec: ExecutionContext): FutureMonad =
    new FutureMonad
}

FutureMonad형은 무척 강력한 도구입니다. Task, Future, IO 뿐만 아니라 무엇이든지 FutureMonad와 함께 사용할 수 있습니다. 다음과 같이 flatMap을 스택-안전하는 것도 가능합니다.

/** 피보나치 수열의 N번째 숫자를 구합니다. */
def fib[F[_]](n: Int)(implicit F: Monad[F]): F[BigInt] = {
  def loop(n: Int, a: BigInt, b: BigInt): F[BigInt] =
    F.flatMap(F.pure(n)) { n =>
      if (n <= 1) F.pure(b)
      else loop(n - 1, b, a + b)
    }
 
  loop(n, BigInt(0), BigInt(1))
}
 
// 사용법
{
  // 유효범위(scope)를 만듭니다.
  import FutureMonad.instance
  import scala.concurrent.ExecutionContext.Implicits.global
 
  // 실행
  fib[Future](40).foreach(r => println(s"결과: $r"))
  //=> 결과: 102334155
}
원주 이 코드는 그저 장난 수준입니다. 본격적인 프로젝트를 보고 싶다면 Typelevel의 Typelevel's Cats를 참고하세요.

Applicative (병렬 실행)

모나드는 실행의 순서를 정의합니다. 하지만 동시에 실행 가능한 서로 독립적인 연산들의 결과를 합쳐야 할 때도 있습니다. 그런 경우에는 Monad보다 Applicative가 적절할 수 있습니다.

먼저 간단한 타입백과사전(Typeclassopedia)을 만들어 봅시다. 9)

trait Functor[F[_]] {
  /** 이 코드는 독자 여러분 모두 이해하리라고 믿습니다. */
  def map[A,B](fa: F[A])(f: A => B): F[B]
}
 
trait Applicative[F[_]] extends Functor[F] {
  /** 생성자 (`A`를 `F[A]` Applicative로 리프팅합니다.) */
  def pure[A](a: A): F[A]
 
  /** 두 참조에 대해서 동시에 map을 실행합니다.
    *
    * 다른 구현체에서는 Applicative 연산자가 `ap`일 수 있습니다.
    * 하지만 `map2` 구현이 훨씬 이해하기 쉽습니다.
    */
  def map2[A,B,R](fa: F[A], fb: F[B])(f: (A,B) => R): F[R]
}
 
trait Monad[F[_]] extends Applicative[F] {
  def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B]
}

이어서 Future를 구현합니다.

class FutureMonad(implicit ec: ExecutionContext)
  extends Monad[Future] {
 
  def pure[A](a: A): Future[A] =
    Future.successful(a)
 
  def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] =
    fa.flatMap(f)
 
  def map2[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R): Future[R] =
    // flatMap에 기반하지 않은 구현체를 위한 함수이지만,
    // Task는 해당되지 않습니다 ;-)
    for (a <- fa; b <- fb) yield f(a,b)
}
 
object FutureMonad {
  implicit def instance(implicit ec: ExecutionContext): FutureMonad =
    new FutureMonad
}

이제 Future, Task 등과 사용할 수 있는 제네릭 함수를 Application에 기반해서 정의할 수 있습니다.

def sequence[F[_], A](list: List[F[A]])
  (implicit F: Applicative[F]): F[List[A]] = {
 
  val seed = F.pure(List.empty[A])
  val r = list.foldLeft(seed)((acc,e) => F.map2(acc,e)((l,a) => a :: l))
  F.map(r)(_.reverse)
}
원주 다시 한 번 강조하지만, 이 코드는 단지 장난 수준일 뿐입니다. 본격적인 프로젝트를 보고 싶다면 Typelevel의 Cats를 참고하세요.

비동기 계산을 위한 타입 클래스를 정의할 수 있을까요?

지금까지 살펴 본 내용에서 빠진 부분은 실제로 계산을 시작하고 결괏값을 받아오는 방법입니다. Scala의 Future를 생각해 보면, 우리는 onComplete를 추상화해야 하는 것입니다. Monix의 Task로 치자면 runAsync를 추상화하는 것이기도 합니다. Haskell이나 Scalaz의 IO에서는 unsafePerformIO입니다.

FS2에서는 Effect라는 타입 클래스를 정의했습니다. 다음은 그 구현을 간략화한 코드입니다.

trait Effect[F[_]] extends Monad[F] {
  def unsafeRunAsync[A](fa: F[A])(cb: Try[A] => Unit): Unit
}

우리가 처음에 만들었던 Async형과 비슷한 것 같기도 합니다. Future.onComplete, Task.runAsync와는 더욱 비슷합니다. IO.unsafePerformIO에 적용할 수 있을 것 같기도 합니다.

하지만 이것은 다음의 이유로 진짜 타입 클래스라고 하기는 어렵습니다.

  • 이 구현은 불규칙적(lawless)입니다. 그러나 이것만으로 타입 클래스가 아니라고 하기는 곤란합니다. (Show와 같이 유용하지만 불규칙적인 타입 클래스도 분명 존재합니다.) 그보다는 더 중요한 이유가 있습니다.
  • 3.3절에서 본 것과 같은 '분노의 StackOverflowError'를 피하기 위해서는 실행 문맥(execution context)이나 스레드풀을 지정해야 하기 때문입니다.

실행 문맥(execution context)을 지정하는 방법은 구현 방식에 따라 다릅니다. Java는 Executor를 사용합니다. Scala의 FutureExecutionContext를 쓰고, Monix는 ExecutionContext를 향상시킨 Scheduler를 사용합니다. FS2와 Scalaz는 스레드를 포크(fork)할 때 Executor의 래퍼(wrapper)인 Strategy를 사용하지만 unsafePerformIOrunAsync가 호출될 때 실행 문맥(execution context)을 변경하지는 않습니다. (그래서 사실 Scalaz의 많은 콤비네이터들은 안전하지 않습니다.)

우리는 이와 같은 전략을 Future에 대해서도 취할 수 있습니다. 유효범위(scope)로부터 implicit whatever: Context를 받아서 타입 클래스의 인스턴스를 생성하는 방식입니다. 하지만 이러한 방식은 약간 이상하고 비효율적입니다. 게다가 이 방식을 따르면 Effect.unsafePerformIO만으로 flatMap을 정의할 수 없고, 그렇다면 그 인스턴스가 반드시 Monad를 상속한다고 보기울 것입니다.

그래서 처음 주어진 질문(“비동기 계산을 위한 타입 클래스를 정의할 수 있을까요?”)에 대해 저는 확신이 없습니다. Cats에 도입할 수 있는 아이디어가 있다면 저도 꼭 듣고 싶습니다.

여러분이 이 사고 실험을 재밌게 즐기셨다면 좋겠습니다. 설계는 늘 즐겁죠. 😎

올바른 도구를 고르자

어떤 추상화는 다른 추상화보다 더 일반적인 목적을 가지고 있습니다. 저는 개인적으로 “특정한 작업에는 특정한 최적의 도구가 있다”는 사고방식이 잘못된 선택을 정당화하기 위해 남용되고 있다고 생각합니다.

이 시점에서 우리는 Rúnar Bjarnason의 <제약은 자유이며, 자유는 제약이다>를 보아야 합니다. 병행성(concurrency)을 추상화하는 작업의 핵심을 정확하게 짚는 멋진 발표입니다.

이미 말했듯이, 병행성(concurrency)을 처리함에 있어 모든 상황에 일반적으로 적용될 수 있는 '은 총알'은 없습니다. 더 고수준의 추상화일수록, 문제를 해결하는 데 필요한 유효범위(scope)는 좁아집니다. 그리고 모델의 유효범위가 좁아지고 기능이 줄어들수록, 그 모델은 간단해지고 결과를 합치기도 쉬워집니다. 예를 들어서 Scala 커뮤니티의 개발자들은 Akka의 행위자 모델(actor model)을 과도하게 사용하는 경향이 있습니다. (물론 Akka는 잘못 쓰지만 않는다면 훌륭한 라이브러리입니다.) 그러나 FutureTask로도 같은 기능을 구현할 수 있다면 Akka의 Actor는 굳이 사용하지 않는 편이 낫습니다. Monix와 ReactiveX의 Observable 등과 같은 다른 추상화도 마찬가지입니다.

그리고 이 두 가지 규칙을 마음에 새기도록 합시다.

  • 콜백, 스레드와 스레드 봉쇄(block)를 지양하자. 오류를 일으키기 쉽고 결과를 합치는 것도 불가능하다.
  • 병행성(concurrency)을 지양하자. 그것은 전염병과 같다.

마지막으로 이것만 말하겠습니다. 병행성(concurrency) 전문가들은 애초에 병행성이 생기지 않도록 하는 데 그 누구보다 전문적입니다. 💀

1)
역주 이 문서는 Monix의 개발자 Alexandru Nedelcu님이 작성한 <Asynchronous Programming and Scala>를 번역한 문서입니다.
2)
역주 대부분의 JavaScript 런타임에서 비지니스 로직은 싱글스레드로 작동합니다. 비동기 코드가 반드시 멀티스레드 환경에서 작동하는 것은 아니라는 뜻이죠.
3)
역주 CPU-bound 작업이란, 처리 속도가 주로 CPU의 속도에 따라 결정되는 작업을 의미합니다. IO-bound, Memory-bound 등의 용어도 있습니다.
4)
역주 '비동기 범위'라는 용어가 모호하게 느껴진다면 Reactive Streams에 등록된 관련 이슈를 읽어보면 좋습니다.
5)
역주 JavaScript에서의 setTimeout을 사용한 스케줄링에 관해서는 역자가 제6회 D2 CAMPUS SEMINAR에서 자세하게 발표했었습니다. 관련 자료는 D2 공식 사이트에 등록되어 있습니다.
6)
역주 Eager evaluation은 한국어로 '조급한 계산법', '즉시 평가' 등으로 번역되곤 합니다. 아쉽지만 어떤 단어도 원어의 의미를 충분히 살리면서 널리 쓰이고 있지 않기 때문에 부득이하게 원어를 그대로 사용했습니다. 동의어로는 'strict evaluation'이 있으며, 반의어로는 'lazy evaluation'이 있습니다.
7)
역주 이 아래부터 6장에서는, 상당히 높은 수준의 함수형 프로그래밍 이론 지식이 요구되는 사고 실험이 진행됩니다. 특히 Haskell에 익숙하지 않다면 바로 7장으로 건너 뛰는 것도 나쁘지 않을 듯합니다.
8)
역주 'higher kinded types'를 어떻게 번역해야 좋을지 난처하던 차에, '상류 타입'이라는 번역어를 한국어판 Scala School에서 빌려왔습니다. 해당 문서에도 '상류 타입'이라는 번역어에 대한 고민이 자세하게 역주로 붙어 있습니다. 이외에도 다른 OOP 언어에서 잘 쓰이지 않는 Scala 특유의 개념들은 대부분 한국어판 Scala School의 번역을 참고했습니다.
9)
역주 '타입백과사전'은 'Typeclassopedia'의 번역입니다. 첫 글자가 대문자인 것으로 보아 Haskell의 타입백과사전을 의식한 것으로 보입니다. 다양한 형들의 구현과 관계를 나타낸 코드나 문서를 타입백과사전이라고 부릅니다.
비동기_프로그래밍과_scala.txt · 마지막으로 수정됨: 2020/08/09 07:39 저자 cumul0529