사용자 도구

사이트 도구


비동기_프로그래밍과_scala

문서의 이전 판입니다!


비동기 프로그래밍과 Scala

비동기(asynchrony)는 어떤 곳에서도 사용될 수 있는, 병행성(concurrency)을 포괄하는 개념입니다. 이 글은 비동기 처리가 무엇인지, 어떤 문제를 해결해야 하는지에 관해 다룹니다. 1)

소개

비동기는 멀티스레딩(multithreading)보다 넓은 개념임에도 불구하고, 일부 개발자들은 비동기와 멀티스레딩을 혼동합니다. 비동기와 멀티스레딩의 관계는 다음과 같습니다.

Multithreading <: Asynchrony

비동기 연산은 다음과 같이 형(type)을 통해서 표현할 수도 있습니다.

type Async[A] = (Try[A] => Unit) => Unit

여러 개의 Unit 반환형(return type)이 더럽다고 느껴지는 이유는, 바로 비동기가 더럽기 때문입니다. 다음과 같은 특성을 갖는 모든 작업, 프로세스, 네트워크 상의 한 노드를 비동기 연산이라고 할 수 있습니다.

  1. 프로그램의 메인 플로우 밖에서 실행되거나, (호출자의 입장에서) 현재의 호출 스택에서 실행되지 않음
  2. 작업이 끝나고 나면 실행되는 콜백을 사용함
  3. 결과가 언제 나올 것이라는 보장도, 결과가 결국 나오긴 나오는지도 보장할 수 없음

비동기가 병행성(concurrency)을 포괄하긴 하지만, 그것이 필연적으로 멀티스레딩을 포괄하는 것이 아님은 매우 중요합니다. 예를 들어, JavaScript에서는 대부분의 I/O 동작이 비동기적일 뿐만 아니라, 아주 무거운 비지니스 로직도 (setTimeout에 기반한 스케쥴링을 통해) 비동기적으로 처리될 수 있기 때문에, 인터페이스가 계속해서 응답할 수 있습니다. 2)

프로그램에 비동기를 도입하는 일은, 병행성 문제(concurrency problems)를 일으킵니다. 우리는 비동기 연산이 언제 끝날지 절대로 알 수 없어서, 동시에 작동한 여러 개의 비동기 연산의 결과를 모으는 일은 동기화(synchronization)를 필요로 하게 되기 때문입니다. 동기화 작업은 프로그램이 더 이상 동작의 순서에 의존하지 않도 하는 작업이기도 한데, 순서로부터의 독립은 바로 비결정론적 알고리즘의 핵심 요소이기도 합니다.

영문 위키피디아, Nondeterministic algorithm: 비결정론적(nondeterministic) 알고리즘은 같은 입력에 대해서도 새로운 실행에서는 이전과 다르게 동작할 수 있는 알고리즘을 가리킨다. 결정론적(deterministic) 알고리즘과 반대 관계이다. … 병행(concurrent) 알고리즘은 경쟁 상태(race condition)로 인해 새로운 실행에서 이전과 다르게 동작할 수 있다.

실력 있는 독자라면, 사용되는 방식과 환경에 따라서 조금씩의 차이는 있지만 이러한 종류의 문제가 어디에서나 발견된다는 사실을 눈치 챌 수 있습니다.

위의 모든 추상화들은 비동기를 처리하는 조금 나은 방법들입니다.

커다란 허상

비동기적인 결과를 동기적인 결과로 변환하는 함수는 흔히 아래와 같이 기술됩니다.

def await[A](fa: Async[A]): A

그러나 우리는 비동기 처리가 다른 일반적인 함수들과 같다고 가정해서는 안됩니다. 그 이유는 CORBA의 실패로부터도 배울 수 있습니다.

비동기 처리에 있어서 우리는 흔히 분산 컴퓨팅의 함정 (fallacies of distributed computing)에 빠지곤 합니다.

  1. 네트워크는 신뢰할 수 있다 (The network is reliable)
  2. 레이턴시는 존재하지 않는다 (Latency is zero)
  3. 대역폭은 무한하다 (Bandwidth is infinite)
  4. 네트워크의 보안은 완벽하다 (The network is secure)
  5. 네트워크 토폴로지는 변하지 않는다 (-Topology doesn't change)
  6. 관리자는 한 명 뿐이다 (There is one administrator)
  7. 전송 비용은 없다 (Transport cost is zero)
  8. 네트워크는 동질적이다 (The network is homogeneous)

당연히 위의 명제는 모두 거짓입니다. 다시 말해, 우리는 흔히 네트워크 상에서 발생한 오류, 레이턴시, 패킷 손실, 대역폭 제한을 충분히 고려하지 않은 코드를 작성한다는 의미입니다.

개발자들은 아래와 같은 방식으로 이러한 문제에 대처해 왔습니다.

이렇게 해결법이 많다는 것은, 그 중에 무엇도 일반적인 목적으로 비동기를 처리하기에 적합하지 않다는 것을 의미합니다. 메모리 관리와 병렬성(concurrency) 처리는 우리 소프트웨어 개발자들이 마주하고 있는 가장 큰 문제라는 점에서, '은 총알은 없다' 딜레마와도 관련이 있습니다.

원주: 경고 - 개인적인 의견과 불평입니다.

어떤 사람들은 Golang과 같은 M:N 플랫폼을 자랑합니다만, 저는 JVM이나 .Net과 같은 1:1 멀티스레딩 플랫폼을 좋아합니다.

프로그래밍 언어에 충분한 표현성(Scala의 Future와 Promise, Task, Closure의 core.async)이 있는 한, 우리는 1:1 멀티스레딩 플랫폼 위에 M:N 플랫폼도 구현할 수 있기 때문입니다. 하지만 M:N 런타임에서 M:N 멀티스레딩에 문제가 발생했을 때는, 플랫폼을 바꾸지 않고서는 문제를 해결할 수 없습니다. 그리고 실제로 대부분의 M:N 플랫폼은 자주 문제가 생깁니다.

모든 가능한 해결법을 배우고 결정을 내리는 것은 실로 엄청나게 고통스러운 작업이지만, 그래도 충분히 알지 못하고 결정을 내리는 것보다는 낫습니다. 이 문제에 한해서는 TOOWTDI 혹은 “나쁜 기능이 더 낫다”를 신념으로 갖지 않는 것이 좋습니다. Scala나 Haskell과 같은 표현적인(expressive) 신생 언어의 학습 난이도가 너무 높다는 불평은 문제의 요점과 떨어져 있습니다. 새로운 언어를 배우는 것은 병행성(concurreny)을 다루는 데에 있어서 가장 사소한 문제이기 때문입니다. 저는 심지어 병행성의 문제를 해결하지 못해 소프트웨어 업계를 떠난 사람들도 알고 있습니다.

콜백 지옥

예를 들어서, 두 개의 비동기 작업을 시작하고 그 결과들을 합친다고 합시다.

먼저 작업을 비동기적으로 실행하는 함수를 정의합니다.

import scala.concurrent.ExecutionContext.global
 
type Async[A] = (A => Unit) => Unit
 
def timesTwo(n: Int): Async[Int] =
  onFinish => {
    global.execute(new Runnable {
      def run(): Unit = {
        val result = n * 2
        onFinish(result)
      }
    })
  }
 
// 사용법
timesTwo(20) { result => println(s"결과: $result") }
//=> 결과: 40

연속 실행 (side-effect의 연옥)

하나의 실행이 끝나면 다음을 연속해서 실행하는 방법으로 다음과 같이 두 개의 비동기 결괏값을 합칠 수 있습니다.

def timesFour(n: Int): Async[Int] =
  onFinish => {
    timesTwo(n) { a =>
      timesTwo(n) { b =>
        // 두 개의 결과를 합친다
        onFinish(a + b)
      }
    }
  }
 
// 사용법
timesFour(20) { result => println(s"결과: $result") }
//=> 결과: 80

두 개의 결괏값을 합치는 정도는 간단합니다.

하지만 비동기성은 관련된 모든 것들에 영향을 미친다는 문제가 있습니다. 여러분이 다음과 같은 순수 함수를 만들었다고 합시다.

def timesFour(n: Int): Int = n * 4

하지만 EJB를 사용하려는 EA가 여러분에게 순수 함수가 아닌 비동기 timesTwo 함수를 사용하라고 지시합니다. 이제 여러분의 timesFour 구현은 순수한 수학적 함수에서 side-effect가 넘쳐나는 함수가 될 수밖에 없습니다. 충분히 좋은 Async형이 없다면 우리는 side-effect가 넘쳐나는 콜백으로 이루어진 작업 흐름을 다뤄야 합니다. 물론 결괏값이 나올 때까지 봉쇄(block)하는 것도 도움이 될 수 없습니다. 그저 문제를 숨기는 것일 뿐이니까요. 2장을 보면 이유를 알 수 있습니다.

게다가 문제는 점점 더 복잡해집니다. 😷

병렬 실행 (비결정론의 림보)

위의 '연속 실행'에서의 두 번째 실행은 첫 번째 실행에 의존하지 않습니다. 즉, 두 실행은 병렬적(parallel)으로 처리될 수 있습니다. JVM에서는 CPU-bound 작업3)들을 병렬적으로 실행할 수 있고, JavaScript에서도 Ajax 요청을 보내거나 Web Worker API를 사용할 때 병렬 처리가 가능합니다.

안타깝게도 병렬 실행에서는 일이 조금 복잡해집니다. 아래와 같은 순진한 접근 방법은 완전히 틀렸습니다.

// 영 좋지 못한 예시
 
def timesFourInParallel(n: Int): Async[Int] =
  onFinish => {
    var cacheA = 0
 
    timesTwo(n) { a => cacheA = a }
 
    timesTwo(n) { b =>
      // 두 개의 결과를 합친다
      onFinish(cacheA + b)
    }
  }
 
timesFourInParallel(20) { result => println(s"결과: $result") }
//=> 결과: 80
 
timesFourInParallel(20) { result => println(s"결과: $result") }
//=> 결과: 40

바로 이것이 비결정론의 실례(實例)입니다. 두 개의 작업이 종료되는 순서가 보장되지 않기 때문에 우리는 동기화를 수행하는 소규모의 상태 기계(state machine)를 구현해야 합니다.

먼저 상태 기계의 ADT를 정의합니다.

// 상태 기계 정의
sealed trait State
// 초기 상태
case object Start extends State
// B가 종료되어 A를 대기하는 상태
final case class WaitForA(b: Int) extends State
// A가 종료되어 B를 대기하는 상태
final case class WaitForB(a: Int) extends State

다음으로, 상태 기계를 비동기적으로 구현합니다.

// JVM에서는 영 좋지 못한 예시 (JavaScript에서는 돌아감)
 
def timesFourInParallel(n: Int): Async[Int] = {
  onFinish => {
    var state: State = Start
 
    timesTwo(n) { a =>
      state match {
        case Start =>
          state = WaitForB(a)
        case WaitForA(b) =>
          onFinish(a + b)
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
    }
 
    timesTwo(n) { b =>
      state match {
        case Start =>
          state = WaitForA(b)
        case WaitForB(a) =>
          onFinish(a + b)
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
    }
  }
}

위의 해결법을 시각화하면 아래와 같은 상태 기계로 나타낼 수 있습니다.

하지만 아직 문제가 남아있습니다. JVM은 1:1 멀티스레딩 플랫폼이기 때문에 공유 메모리에 병행적으로 접근(shared memory concurrency)하게 됩니다. 따라서 state에 대한 접근이 동기화되어야 합니다.

사용할 수 있는 한 가지 해결 방법은 synchronized 블록을 사용하는 것입니다. 이 방법은 '암묵적인 락(intrinsic lock)'이라고도 부릅니다.

// 모니터처럼 작동할 수 있는 공통 참조 생성
val lock = new AnyRef
var state: State = Start
 
timesTwo(n) { a =>
  lock.synchronized {
    state match {
      case Start =>
        state = WaitForB(a)
      case WaitForA(b) =>
        onFinish(a + b)
      case WaitForB(_) =>
        // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
        throw new IllegalStateException(state.toString)
    }
  }
}
 
//...

이러한 고수준의 락(high-level lock)은 여러 개의 스레드가 병렬적(parallel)으로 (위의 state와 같은) 리소스에 접근하지 못하도록 보호합니다. 하지만 저는 개인적으로 고수준 락의 사용을 피합니다. 커널의 스케쥴러는 임의의 스레드를 임의의 이유로 정지시킬 수 있는데, 락을 갖고 있는 스레드가 커널에 의해 정지되면 다른 모든 스레드가 정지되기 때문입니다. 정지를 피하고 끊임 없이 작업을 실행하기 위해서는 비봉쇄 알고리즘(non-blocking logic)이 더 낫습니다.

그래서 이 경우에는 AtomicReference를 사용하는 것이 정답입니다.

// JVM에서 잘 돌아가는 예시
 
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
 
def timesFourInParallel(n: Int): Async[Int] = {
  onFinish => {
    val state = new AtomicReference[State](Start)
 
    @tailrec def onValueA(a: Int): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForB(a)))
            onValueA(a) // 재시도
        case WaitForA(b) =>
          onFinish(a + b)
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    timesTwo(n)(onValueA)
 
    @tailrec def onValueB(b: Int): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForA(b)))
            onValueB(b) // 재시도
        case WaitForB(a) =>
          onFinish(a + b)
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    timesTwo(n)(onValueB)
  }
}
원주: MonixAtomic형을 사용하면 코드를 JavaScript / Scala.js에서 크로스컴파일 할거나, 성능 향상을 위한 도구와 AtomicReference와 관련된 훌륭한 유틸리티들을 사용할 수 있습니다.

이제 조금 어려워졌나요? 조금 더 힘내봅시다.

재귀 실행 (분노의 StackOverflow)

위의 onFinish의 호출이 스택-안전하지 않고(stack-unsafe), 호출 시점에서 비동기 범위(asynchronous boundary)를 강제하지 않으면 프로그램이 StackOverflowError로 터질 수 있다고 한다면, 어떤 생각이 드시나요? 4)

말 그대로의 의미입니다. 앞서 작성한 코드를 제네릭한 방법(generic way)으로 다시 작성해 봅시다.

import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
 
type Async[+A] = (A => Unit) => Unit
 
def mapBoth[A,B,R](fa: Async[A], fb: Async[B])(f: (A,B) => R): Async[R] = {
  // 상태 기계 정의
  sealed trait State[+A,+B]
  // 초기 상태
  case object Start extends State[Nothing, Nothing]
  // B가 종료되어 A를 대기하는 상태
  final case class WaitForA[+B](b: B) extends State[Nothing,B]
  // A가 종료되어 B를 대기하는 상태
  final case class WaitForB[+A](a: A) extends State[A,Nothing]
 
  onFinish => {
    val state = new AtomicReference[State[A,B]](Start)
 
    @tailrec def onValueA(a: A): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForB(a)))
            onValueA(a) // 재시도
        case WaitForA(b) =>
          onFinish(f(a,b))
        case WaitForB(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    @tailrec def onValueB(b: B): Unit =
      state.get match {
        case Start =>
          if (!state.compareAndSet(Start, WaitForA(b)))
            onValueB(b) // retry
        case WaitForB(a) =>
          onFinish(f(a,b))
        case WaitForA(_) =>
          // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록
          throw new IllegalStateException(state.toString)
      }
 
    fa(onValueA)
    fb(onValueB)
  }
}

그리고 Scala의 Future.sequence와 유사한 기능을 구현합시다. 우리의 의지는 강인하고 용기는 무한하니까요. 😇

def sequence[A](list: List[Async[A]]): Async[List[A]] = {
  @tailrec def loop(list: List[Async[A]], acc: Async[List[A]]): Async[List[A]] =
    list match {
      case Nil =>
        onFinish => acc(r => onFinish(r.reverse))
      case x :: xs =>
        val update = mapBoth(x, acc)(_ :: _)
        loop(xs, update)
    }
 
  val empty: Async[List[A]] = _(Nil)
  loop(list, empty)
}
 
// 호출
sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))) { r =>
  println(s"결과: $r")
}
//=> 결과: List(20, 40, 60)

이것으로 끝났을까요? 아닙니다.

val list = 0.until(10000).map(timesTwo).toList
sequence(list)(r => println(s"합: ${r.sum}"))

자, 위의 코드를 실행시키고 영광스러운 메모리 오류를 목도하십시오. 프로덕션 환경이었다면 아래의 오류는 Scala의 NonFatal로 처리되지도 않아서 프로그램이 크래시를 일으켰을 겁니다.

java.lang.StackOverflowError
  at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2414)
  at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2630)
  at scala.concurrent.impl.ExecutionContextImpl$$anon$3.execute(ExecutionContextImpl.scala:131)
  at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:20)
  at .$anonfun$timesTwo$1(<pastie>:27)
  at .$anonfun$timesTwo$1$adapted(<pastie>:26)
  at .$anonfun$mapBoth$1(<pastie>:66)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)
  at .$anonfun$mapBoth$1$adapted(<pastie>:40)
  at .$anonfun$mapBoth$1(<pastie>:67)

앞서 말씀드린 것과 같이, 강제된 비동기 범위 없이 실행된 onFinish는 스택 오버플로우를 유발할 수 있습니다. JavaScript에서는 setTimeout을 사용한 스케쥴링으로 문제를 해결할 수 있으며, JVM에서는 스레드 풀을 사용하거나 Scala의 ExecutionContext를 사용할 수 있습니다. 5)

Future와 Promise

scala.concurrent.Future는 엄격하게 계산되는 비동기 연산을 기술하는 방법입니다. 지금까지 위에서 사용한 Async형과도 유사합니다.

Futures and promises, 영문 위키피디아: Future와 Promise는 일부 병행 프로그래밍 언어(concurrent programming language)에서 프로그램의 실행을 동기화하기 위해서 사용하는 생성자이다. Future와 Promise가 기술하는 객체는 연산이 아직 완료되지 않아 초기에는 계산할 수 없는 값에 대한 대리자(proxy)이다.
원주: 필자의 불평입니다.

Future와 Promise에 관한 docs.scala-lang.org문서의 다음 설명은 오해를 불러일으키기 쉽고 오개념의 원인이 됩니다.

“Future는 여러 동작을 병렬적(paraell)으로 수행하는 방법을 제공합니다. 이러한 방법은 효율적이며 비봉쇄적(non-blocking)입니다.”

Future형은 비동기(asynchrony)를 기술하는 것이지, 병렬성(parallelism)을 기술하는 것이 아닙니다. 물론 Future를 사용해서 병렬처리를 하는 것도 가능하지만, 그렇다고 그것이 병렬처리만을 할 수 있다는 뜻은 아닙니다. 게다가 CPU의 성능을 최대로 끌어올릴 수 있는 방법을 찾는 사람들에게 있어서 Future는 비용이 크고 현명하지 못한 선택입니다. (4.4절 참고)

Future는 다음과 같이 두 개의 주요 연산(primary operation)과, 주요 연산을 기반으로 정의되는 여러 함수 콤비네이터들로 정의되는 인터페이스입니다.

import scala.util.Try
import scala.concurrent.ExecutionContext
 
trait Future[+T] {
  // abstract
  def value: Option[Try[T]]
 
  // abstract
  def onComplete(f: Try[T] => Unit)(implicit ec: ExecutionContext): Unit
 
  // 값 변형
  def map[U](f: T => U)(implicit ec: ExecutionContext): Future[U] = ???
  // 연속 실행 ;-)
  def flatMap[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[U] = ???
  // ...
}

Future의 특성은 다음과 같습니다.

  • 조급한 계산 (엄격한 계산)6). 함수의 호출자가 Future의 참조를 전달 받으면, 작업 내용과 무관하게 비동기 작업은 이미 시작된 상태입니다.
  • 메모아이제이션 (캐싱). 조급한 계산이 이루어진다는 것은, 함수가 아닌 일반적인 변수와 같이 동작하며, 결괏값은 모든 소비자(listener)에서 사용할 수 있어야 함을 의미합니다. value 속성은 결괏값을 메모아이즈하기 위해 존재합니다. 아직 연산이 완료되지 않은 경우에는 None이 할당됩니다. 당연한 이야기지만, value 속성의 def value를 호출하면 비결정론적인 결과를 얻습니다.
  • 단일한 값을 흘려보내고(stream) 나타냅니다(show). 메모아이제이션이 적용되었기 때문입니다. 따라서 작업 완료에 대한 소비자(listener)는 최대 한 번까지만 호출됩니다.

다음은 ExecutionContext에 대한 부연설명입니다.

  • ExecutionContext는 비동기 실행을 관리합니다. 스레드 풀과 유사하다고 할 수 있겠지만, 반드시 스레드 풀의 역할만을 하는 것은 아닙니다. (비동기는 멀티스레딩이나 병렬성과 다르니까요)
  • onComplete는 기본적으로 우리가 위에서 정의한 Async형과 유사하지만, ExecutionContext를 인수로 받습니다. 모든 완료 콜백은 비동기적으로 호출되어야 하기 때문입니다.
  • 모든 콤비네이터와 유틸리티들은 onComplete를 기반으로 구현되었기 때문에 ExecutionContext를 인수로 받습니다.

왜 모든 시그니처에 ExecutionContext를 붙여야 하는지 아직 이해하지 못하겠다면 3.3절을 다시 읽어보세요.

연속 실행

3장에서 만든 함수를 Future로 다시 정의해 봅시다.

import scala.concurrent.{Future, ExecutionContext}
 
def timesTwo(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  Future(n * 2)
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  timesTwo(20).onComplete { result => println(s"결과: $result") }
  //=> 결과: Success(40)
}

아주 쉽습니다. Future.apply 메서드는 주어진 ExecutionContext에서 주어진 연산을 수행합니다. JVM에서 global 컨텍스트를 선택했으므로, 연산은 별개의 스레드에서 실행됩니다.

3.1절의 연속 실행은 다음과 같이 구현할 수 있습니다.

def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  timesFour(20).onComplete { result => println(s"결과: $result") }
  //=> 결과: Success(80)
}

역시 아주 쉬워졌습니다. 위의 for-컴프리헨션은 아래와 같이 flatMapmap으로 풀어서 쓸 수도 있습니다.

def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  timesTwo(n).flatMap { a =>
    timesTwo(n).map { b =>
      a + b
    }
  }

프로젝트에서 scala-async를 사용하면 다음과 같이 할 수도 있습니다.

import scala.async.Async.{async, await}
 
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  async {
    val a = await(timesTwo(a))
    val b = await(timesTwo(b))
    a + b
  }

scala-async 라이브러리는 매크로를 이용해서 여러분의 코드를 flatMapmap을 이용하는 코드로 변환합니다. 다시 말해서, await는 스레드를 봉쇄(block)하지 않습니다. 그럴 것 같은 인상을 주더라도 말이죠.

scala-async는 정말로 훌륭해 보이지만, 안타깝게도 여러가지 한계가 있습니다. 예를 들어서 익명 함수 안에 있는 await는 변환되지 않습니다. 아쉽지만 Scala 코드는 보통 그런 표현식으로 넘쳐납니다. 다음과 같은 코드도 작동하지 않습니다.

// 영 좋지 못한 예시
def sum(list: List[Future[Int]])(implicit ec; ExecutionContext): Future[Int] =
  async {
    var sum = 0
    // for는 foreach로 변환되기 때문에 작동하지 않습니다
    for (f <- list) {
      sum += await(f)
    }
  }

scala-async의 접근법은 마치 1급 지속을 구현한 것과 같은 착각을 일으킵니다. 하지만 그러한 지속은 안타깝게도 1급이 아니고, 그저 컴파일러에 의해서 관리되는 코드 재작성일 뿐입니다. 그리고 이러한 한계는 C#과 ECMAScript에도 똑같이 적용됩니다. 그리고 이것은 안타깝게도 함수형 프로그래밍에서 async 키워드가 그다지 많이 쓰일 수 없음을 의미합니다.

'은 총알은 없다'는 저의 불평이 다시 떠오른다면 좋겠습니다. 😞

병렬 실행

3.2절에서와 마찬가지로 두 번의 함수 호출은 서로에 대해 독립적이기 때문에 병렬적으로 실행할 수 있습니다. 초보자에게는 계산 구문(evaluation semantics)이 조금 난해할 수 있지만, Future를 사용하면 이 작업은 더 쉬워집니다.

def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = {
  // Future는 조급하게 계산되므로, 둘 모두 값을 합치기 전에 실행됩니다
  val fa = timesTwo(n)
  val fb = timesTwo(n)
 
  for (a <- fa; b <- fb) yield a + b
  // fa.flatMap(a => fb.map(b => a + b))
}

이러한 방법은 초보자에게는 조금 헷갈릴 수 있습니다. 작업이 병렬적(parallel)으로 실행되도록 하기 위해서는 간단히 결괏값을 합치기 이전에 Future를 생성하면 됩니다.

다른 방법으로, 임의의 컬렉션에 대해 Future.sequence를 사용할 수도 있습니다.

def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] =
  Future.sequence(timesTwo(n) :: timesTwo(n) :: Nil).map(_.sum)

이 방법도 초보자에게는 조금 충격적일 수도 있습니다. 여기서 Future들은 그 컬렉션이 엄격(strict)하게 정의되었을 때에만 병렬적(parallel)으로 실행되기 때문입니다. 이때 '엄격하게'라는 말은 Scala의 Stream이나 Iterator 등으로 정의되지 않았음을 의미합니다. 이러한 표현이 초보자에게는 불명확한 탓도 있을 듯합니다.

재귀 실행

Future형은 재귀 동작에 대해서 완전히 안전합니다. (콜백 호출을 ExecutionContext에 의존하고 있기 때문입니다.) 3.3절의 예시를 다시 작성해 봅시다.

def mapBoth[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R)
  (implicit ec: ExecutionContext): Future[R] = {
 
  for (a <- fa; b <- fb) yield f(a,b)
}
 
def sequence[A](list: List[Future[A]])
  (implicit ec: ExecutionContext): Future[List[A]] = {
 
  val seed = Future.successful(List.empty[A])
  list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l)
    .map(_.reverse)
}
 
// 사용법
{
  import scala.concurrent.ExecutionContext.Implicits.global
 
  sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println)
  // => List(20, 40, 60)
}

이번에는 StackOverflowError가 발생하지 않습니다.

val list = 0.until(10000).map(timesTwo).toList
sequence(list).foreach(r => println(s"합: ${r.sum}"))
//=> 합: 99990000

성능 고려사항

Future의 문제점은 onComplete를 호출할 때마다 ExecutionContext가 사용된다는 점입니다. 이러한 동작은 일반적으로 Runnable이 스레드 풀로 전달하는 것이고, 논리 스레드를 포크(fork)하게 됩니다. CPU-bound 작업이 많다면 Future의 구현은 성능에 재앙과도 같습니다. 스레드를 이동하는 동작은 문맥 교환(context switch)을 발생시키고 CPU 캐시의 참조 국소성(CPU cache locality)을 파괴하기 때문입니다. 물론 Future의 구현은 어느정도의 최적화를 수행하긴 합니다. 내부 콜백으로 인해서 새 스레드가 포크되지 않도록 flatMap은 내부 실행 맥락(internal execution context)을 사용합니다. 하지만 그러한 최적화로는 부족하고, 벤치마크 결과는 거짓말을 하지 않습니다.

또 메모아이제이션이 이루어진다는 것은, 작업이 완료되었을 때 생산자(producer)와 소비자(listener)에서 각각 한 번 씩 AtomicReference.compareAndSet이 호출된다는 의미입니다. 메모아이제이션이 멀티스레드 환경에서 잘 작동하도록 하는 이 호출들은 꽤 많은 비용이 듭니다.

다시 말해서, 여러분이 CPU의 성능을 최대로 끌어올려서 CPU-bound 작업을 수행하려고 한다면 FuturePromise를 사용하는 것은 그다지 좋은 생각이 아닙니다.

Scala의 FutureTask 구현의 성능 비교는, 다음의 최근 벤치마크 결과로 볼 수 있습니다.

[info] Benchmark                   (size)   Mode  Cnt     Score     Error  Units
[info] FlatMap.fs2Apply             10000  thrpt   20   291.459 ±   6.321  ops/s
[info] FlatMap.fs2Delay             10000  thrpt   20  2606.864 ±  26.442  ops/s
[info] FlatMap.fs2Now               10000  thrpt   20  3867.300 ± 541.241  ops/s
[info] FlatMap.futureApply          10000  thrpt   20   212.691 ±   9.508  ops/s
[info] FlatMap.futureSuccessful     10000  thrpt   20   418.736 ±  29.121  ops/s
[info] FlatMap.futureTrampolineEc   10000  thrpt   20   423.647 ±   8.543  ops/s
[info] FlatMap.monixApply           10000  thrpt   20   399.916 ±  15.858  ops/s
[info] FlatMap.monixDelay           10000  thrpt   20  4994.156 ±  40.014  ops/s
[info] FlatMap.monixNow             10000  thrpt   20  6253.182 ±  53.388  ops/s
[info] FlatMap.scalazApply          10000  thrpt   20   188.387 ±   2.989  ops/s
[info] FlatMap.scalazDelay          10000  thrpt   20  1794.680 ±  24.173  ops/s
[info] FlatMap.scalazNow            10000  thrpt   20  2041.300 ± 128.729  ops/s

CPU-bound 작업은 Monix Task가 Scala의 Future에 비해 압도적인 성능을 보여주고 있습니다.

원주: 이 벤치마크 결과는 한정적입니다. 여전히 Future가 더 빠른 경우도 있고 (Monix의 Observer는 몇 가지 합당한 이유로 인해 Future를 사용하고 있습니다), 애초에 CPU-bound 작업의 성능이 크게 중요하지 않은 경우도 자주 있습니다. 예를 들어서 I/O 작업을 할 때는 처리량과 CPU 속도가 별 상관이 없습니다.

Task와 Scala의 IO 모나드

Task is a data type for controlling possibly lazy & asynchronous computations, useful for controlling side-effects, avoiding nondeterminism and callback-hell.

The Monix library provides a very sophisticated Task implementation, inspired by the Task in Scalaz. Same concept, different implementation.

The Task type is also inspired by Haskell's IO monad, being in this author's opinion the true IO type for Scala.

This is a matter of debate, as Scalaz also exposes a separate IO type that only deals with synchronous execution. The Scalaz IO is not async, which means that it doesn't tell the whole story, because on top of the JVM you need to represent async computations somehow. In Haskell on the other hand you have the Async type which is converted to IO, possibly managed by the runtime (green-threads and all).

On the JVM, with the Scalaz implementation, we can't represent async computations with IO and without blocking threads on evaluation, which is something to avoid, because blocking threads is error prone.

In summary the Task type:

  • models lazy & asynchronous evaluation
  • models a producer pushing only one value to one or multiple consumers
  • it is lazily evaluated, so compared with Future it doesn’t trigger the execution, or any effects until runAsync
  • it is not memoized by default on evaluation, but the Monix Task can be
  • doesn’t necessarily execute on another logical thread

Specific to the Monix implementation:

  • allows for cancelling of a running computation
  • never blocks any threads in its implementation
  • does not expose any API calls that can block threads
  • all async operations are stack safe
  • A visual representation of where Task sits in the design space:
Eager Lazy
Synchronous A () ⇒ A
Coeval[A], IO[A]
Asynchronous (A ⇒ Unit) ⇒ Unit (A ⇒ Unit) ⇒ Unit
Future[A] Task[A]

Sequencing

Redefining our function from section 3 in terms of Task:

import monix.eval.Task
 
def timesTwo(n: Int): Task[Int] =
  Task(n * 2)
 
// Usage
{
  // Our ExecutionContext needed on evaluation
  import monix.execution.Scheduler.Implicits.global
 
  timesTwo(20).foreach { result => println(s"Result: $result") }
  //=> Result: 40
}

The code seems to be almost the same as the Future version in section 4.1, the only difference is that our timesTwo function no longer takes an ExecutionContext as a parameter. This is because Task references are lazy, being like functions, so nothing gets printed until the call to foreach which forces the evaluation to happen. It is there that we need a Scheduler, which is Monix's enhanced ExecutionContext.

Now to do sequencing like in section 3.1:

def timesFour(n: Int): Task[Int] =
  for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b
 
// Usage
{
  // Our ExecutionContext needed on evaluation
  import monix.execution.Scheduler.Implicits.global
 
  timesFour(20).foreach { result => println(s"Result: $result") }
  //=> Result: 80
}

And just like with the Future type, that “for comprehension” magic is translated by the Scala compiler to nothing more than calls to flatMap and map, literally equivalent with:

def timesFour(n: Int): Task[Int] =
  timesTwo(n).flatMap { a =>
    timesTwo(n).map { b => a + b }
  }

Parallelism

The story for Task and parallelism is better than with Future, because Task allows fine-grained control when forking tasks, while trying to execute transformations (e.g. map, flatMap) on the current thread and call-stack, thus preserving cache locality and avoiding context switches for what is in essence sequential work.

But first, translating the sample using Future does not work:

// BAD SAMPLE (for achieving parallelism, as this will be sequential)
def timesFour(n: Int): Task[Int] = {
  // Will not trigger execution b/c Task is lazy
  val fa = timesTwo(n)
  val fb = timesTwo(n)
  // Evaluation will be sequential b/c of laziness
  for (a <- fa; b <- fb) yield a + b
}

In order to achieve parallelism ''Task'' requires you to be explicit about it:

def timesFour(n: Int): Task[Int] =
  Task.mapBoth(timesTwo(n), timesTwo(n))(_ + _)

Oh, does mapBoth seem familiar? If those two tasks fork threads on execution, then they will get executed in parallel as mapBoth starts them both at the same time.

Recursivity

Task is recursive and stack-safe (in flatMap) and incredibly efficient, being powered by an internal trampoline. You can checkout this cool paper by Rúnar Bjarnason on Stackless Scala with Free Monads for getting a hint on how Task got implemented so efficiently.

The sequence implementation looks similar with the one for Future in section 4.3, except that you can see the laziness in the signature of sequence:

def sequence[A](list: List[Task[A]]): Task[List[A]] = {
  val seed = Task.now(List.empty[A])
  list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l)
    .map(_.reverse)
}
 
// Invocation
{
  // Our ExecutionContext needed on evaluation
  import monix.execution.Scheduler.Implicits.global
 
  sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println)
  // => List(20, 40, 60)
}

Functional Programming and Type-classes

When working with well grown functions such as map, flatMap and mapBoth, we no longer care that underlying it all is an "(A => Unit) => Unit", because these functions are, assuming lawfulness, pure and referentially transparent. This means we can reason about them and their result, divorced from their surrounding context.

This is the great achievement of Haskell's IO. Haskell does not “fake” side-effects, as functions returning IO values are literally pure, the side-effects being pushed at the edges of the program where they belong. And we can say the same thing about Task. Well, for Future it's more complicated given its eager nature, but working with Future is not bad either.

And can we build interfaces that abstract over such types as Task, Future, Coeval, Eval, IO, Id, Observable and others?

Yes we can, we've already seen that flatMap describes sequencing, while mapBoth describes parallelism. But we can't describe them with classic OOP interfaces, for one because due to the covariance and contravariance rules of Function1 parameters we'd lose type info in flatMap (unless you use F-bounded polymorphic types, which are more suitable for implementation reuse and aren't available in other OOP languages), but also because we need to describe a data constructor that can't be a method (i.e. OOP subtyping applies to instances and not whole classes).

Fortunately Scala is one of the very few languages capable of higher kinded types and with the ability to encode type-classes, which means we've got everything needed to port concepts from Haskell 😄

Author's Rant: The dreaded Monad, Applicative and Functor words strike fear in the hearts of the unfaithful, having given rise to the belief that they are “academic” notions disconnected from real-world concerns, with book authors going to great length to avoid using these words, which includes Scala's API documentation and official tutorials.

But this is a disservice to both the Scala language and its users. In other languages they are only design patterns that are hard to explain primarily because they can't be expressed as types. You can count the languages having this expressive capability with one hand. And users suffer because in case of trouble they don't know how to search for existing literature on the subject, having been deprived of learning the correct jargon.

I also feel this is a flavor of anti-intellectualism, as usual born out of fear of the unknown. You can see it coming from people that really know what they are doing, as none of us is immune. For example Java's Optional type violates the functor laws (e.g. opt.map(f).map(g) != opt.map(f andThen g)), in Swift 5 == Some(5) which is preposterous and good luck explaining to people that Some(null) actually makes sense for as long as null is a valid value of AnyRef and because otherwise you can't define Applicative[Option].

Monad (Sequencing and Recursivity)

This article is not about explaining Monads. There are other great articles for that. But if you're looking to build an intuition, here's another one: in the context of data types such as Future or Task, Monads describe sequencing of operations and is the only reliable way to ensure ordering.

Aleksey Shipilëv: “Observation: programmers doing concurrency with imperative languages are tripped by the unchallenged belief that ”;“ defines sequencing.”

A simple encoding of the Monad type in Scala:

// We shouldn't need to do this :-(
import scala.language.higherKinds
 
trait Monad[F[_]] {
  /** Constructor (said to lift a value `A` in the `F[A]`
    * monadic context). Also part of `Applicative`, see below.
    */
  def pure[A](a: A): F[A]
 
  /** FTW */
  def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B]
}

And providing an implementation for Future:

import scala.concurrent._
 
// Supplying an instance for Future isn't clean, ExecutionContext needed
class FutureMonad(implicit ec: ExecutionContext)
  extends Monad[Future] {
 
  def pure[A](a: A): Future[A] =
    Future.successful(a)
 
  def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] =
    fa.flatMap(f)
}
 
object FutureMonad {
  implicit def instance(implicit ec: ExecutionContext): FutureMonad =
    new FutureMonad
}

This is really powerful stuff. We can now describe a generic function that works with Task, Future, IO, whatever, although it would be great if the flatMap operation is stack-safe:

/** Calculates the N-th number in a Fibonacci series. */
def fib[F[_]](n: Int)(implicit F: Monad[F]): F[BigInt] = {
  def loop(n: Int, a: BigInt, b: BigInt): F[BigInt] =
    F.flatMap(F.pure(n)) { n =>
      if (n <= 1) F.pure(b)
      else loop(n - 1, b, a + b)
    }
 
  loop(n, BigInt(0), BigInt(1))
}
 
// Usage:
{
  // Needed in scope
  import FutureMonad.instance
  import scala.concurrent.ExecutionContext.Implicits.global
 
  // Invocation
  fib[Future](40).foreach(r => println(s"Result: $r"))
  //=> Result: 102334155
}
PRO-TIP: this is just a toy example. For getting serious, see Typelevel's Cats

Applicative (Parallelism)

Monads define sequencing of operations, but sometimes we want to compose the results of computations that are independent of each other, that can be evaluated at the same time, possibly in parallel. There's also a case to be made that applicatives are more composable than monads 😏

Let's expand our mini Typeclassopedia to put on your wall:

trait Functor[F[_]] {
  /** I hope we are all familiar with this one. */
  def map[A,B](fa: F[A])(f: A => B): F[B]
}
 
trait Applicative[F[_]] extends Functor[F] {
  /** Constructor (lifts a value `A` in the `F[A]` applicative context). */
  def pure[A](a: A): F[A]
 
  /** Maps over two references at the same time.
    *
    * In other implementations the applicative operation is `ap`,
    * but `map2` is easier to understand.
    */
  def map2[A,B,R](fa: F[A], fb: F[B])(f: (A,B) => R): F[R]
}
 
trait Monad[F[_]] extends Applicative[F] {
  def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B]
}

And to expand our Future implementation:

// Supplying an instance for Future isn't clean, ExecutionContext needed
class FutureMonad(implicit ec: ExecutionContext)
  extends Monad[Future] {
 
  def pure[A](a: A): Future[A] =
    Future.successful(a)
 
  def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] =
    fa.flatMap(f)
 
  def map2[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R): Future[R] =
    // For Future there's no point in supplying an implementation that's
    // not based on flatMap, but that's not the case for Task ;-)
    for (a <- fa; b <- fb) yield f(a,b)
}
 
object FutureMonad {
  implicit def instance(implicit ec: ExecutionContext): FutureMonad =
    new FutureMonad
}

So we can now define generic functions based on Applicative which is going to work for Future, Task, etc:

def sequence[F[_], A](list: List[F[A]])
  (implicit F: Applicative[F]): F[List[A]] = {
 
  val seed = F.pure(List.empty[A])
  val r = list.foldLeft(seed)((acc,e) => F.map2(acc,e)((l,a) => a :: l))
  F.map(r)(_.reverse)
}
PRO-TIP: worth repeating, this is just a toy example. For getting serious, see Typelevel's Cats

Can We Define a Type-class for Async Evaluation?

Missing from above is a way to actually trigger an evaluation and get a value out. Thinking of Scala's Future, we want a way to abstract over onComplete. Thinking of Monix's Task we want to abstract over runAsync. Thinking of Haskell's and Scalaz's IO, we want a way to abstract over unsafePerformIO.

The FS2 library has defined a type-class called Effect that goes like this (simplified):

trait Effect[F[_]] extends Monad[F] {
  def unsafeRunAsync[A](fa: F[A])(cb: Try[A] => Unit): Unit
}

This looks like our initial Async type, very much similar with Future.onComplete, with Task.runAsync and could be applied to IO.unsafePerformIO.

However, this is not a real type-class because:

  1. it is lawless and while that's not enough to disqualify it (after all, useful lawless type-classes like Show exist), the bigger problem is …
  2. as shown in section 3.3, in order to avoid the Wrath of StackOverflowError, we need some sort of execution context or thread-pool that can execute tasks asynchronously without blowing up the stack

And such an execution context is different from implementation to implementation. Java will use Executor, the Scala Future uses ExecutionContext, Monix uses Scheduler which is an enhanced ExecutionContext, FS2 and Scalaz use Strategy which wraps an Executor for forking threads and don't inject a context when their unsafePerformIO or runAsync gets called (which is why many of the Scalaz combinators are in fact unsafe), etc.

We could apply the same strategy as with Future, to build the type-class instance by taking a implicit whatever: Context from the scope. But that's a little awkward and inefficient. It's also telling that we can't define flatMap only in terms of Effect.unsafePerformIO, not without that execution context. And if we can't do it, then the type should probably not inherit from Monad because it's not necessarily a Monad.

So I'm personally not sure - if you have suggestions for what should be introduced in Cats, I'd love to hear them.

I do hope you enjoyed this thought experiment, designing things is fun 😎

Picking the Right Tool

Some abstractions are more general purpose than others and personally I think the mantra of “picking the right tool for the job” is overused to defend poor choices.

That said, there's this wonderful presentation by Rúnar Bjarnason called Constraints Liberate, Liberties Constrain that really drives the point home with concurrency abstractions at least.

As said, there is no silver bullet that can be generally applied for dealing with concurrency. The more high-level the abstraction, the less scope it has in solving issues. But the less scope and power it has, the simpler and more composable the model is. For example many developers in the Scala community are overusing Akka Actors - which is a great library, but not when misapplied. Like don't use an Akka Actor when a Future or a Task would do. Ditto for other abstractions, like the Observable pattern in Monix and ReactiveX.

Also learn by heart these 2 very simple rules:

avoid dealing with callbacks, threads and locks, because they are very error prone and not composable at all avoid concurrency like the plague it is And let me tell you, concurrency experts are first of all experts in avoiding concurrency 💀

1)
역주: 이 문서는 Monix의 개발자 Alexandru Nedelcu님이 작성한 <Asynchronous Programming and Scala>를 번역한 문서입니다.
2)
역주: 대부분의 JavaScript 런타임에서 비지니스 로직은 싱글스레드로 작동합니다. 비동기 코드가 반드시 멀티스레드 환경에서 작동하는 것은 아니라는 뜻이죠.
3)
역주: CPU-bound 작업이란, 처리 속도가 주로 CPU의 속도에 따라 결정되는 작업을 의미합니다. IO-bound, Memory-bound 등의 용어도 있습니다.
4)
역주: '비동기 범위'라는 용어가 모호하게 느껴진다면 Reactive Streams에 등록된 관련 이슈를 읽어보면 좋습니다.
5)
역주: JavaScript에서의 setTimeout을 사용한 스케줄링에 관해서는 역자가 제6회 D2 CAMPUS SEMINAR에서 자세하게 발표했었습니다. 관련 자료는 D2 공식 사이트에 등록되어 있습니다.
6)
역주: '조급한 계산'은 'eager evaluation'의 번역이며, '엄격한 계산(strict evaluation)'이라고도 합니다. 반의어로는 '느긋한 계산(lazy evaluation)'이 있습니다.
비동기_프로그래밍과_scala.1488093297.txt.gz · 마지막으로 수정됨: 2017/02/25 22:14 (바깥 편집)