비동기(asynchrony)는 어떤 곳에서도 사용될 수 있는, 병행성(concurrency)을 포괄하는 개념입니다. 이 글은 비동기 처리가 무엇인지, 어떤 문제를 해결해야 하는지에 관해 다룹니다. 1)
비동기는 멀티스레딩(multithreading)보다 넓은 개념임에도 불구하고, 일부 개발자들은 비동기와 멀티스레딩을 혼동합니다. 비동기와 멀티스레딩의 관계는 다음과 같습니다.
Multithreading <: Asynchrony
비동기 연산은 다음과 같이 형(type)을 통해서 표현할 수도 있습니다.
type Async[A] = (Try[A] => Unit) => Unit
여러 개의 Unit
반환형(return type)이 더럽다고 느껴지는 이유는, 바로 비동기가 더럽기 때문입니다. 다음과 같은 특성을 갖는 모든 작업, 프로세스, 네트워크 상의 한 노드를 비동기 연산이라고 할 수 있습니다.
비동기가 병행성(concurrency)을 포괄하긴 하지만, 그것이 필연적으로 멀티스레딩을 포괄하는 것이 아님은 매우 중요합니다. 예를 들어, JavaScript에서는 대부분의 I/O 동작이 비동기적일 뿐만 아니라, 아주 무거운 비지니스 로직도 (setTimeout
에 기반한 스케쥴링을 통해) 비동기적으로 처리될 수 있기 때문에, 인터페이스가 계속해서 응답할 수 있습니다. 2)
프로그램에 비동기를 도입하는 일은, 병행성 문제(concurrency problems)를 일으킵니다. 우리는 비동기 연산이 언제 끝날지 절대로 알 수 없어서, 동시에 작동한 여러 개의 비동기 연산의 결과를 모으는 일은 동기화(synchronization)를 필요로 하게 되기 때문입니다. 동기화 작업은 프로그램이 더 이상 동작의 순서에 의존하지 않도록 하는 작업이기도 한데, 순서로부터의 독립은 바로 비결정론적 알고리즘의 핵심 요소이기도 합니다.
비결정론적(nondeterministic) 알고리즘은 같은 입력에 대해서도 새로운 실행에서는 이전과 다르게 동작할 수 있는 알고리즘을 가리킨다. 결정론적(deterministic) 알고리즘과 반대 관계이다. … 병행(concurrent) 알고리즘은 경쟁 상태(race condition)로 인해 새로운 실행에서 이전과 다르게 동작할 수 있다. Nondeterministic algorithm, 영문 위키피디아
실력 있는 독자라면, 사용되는 방식과 환경에 따라서 조금씩의 차이는 있지만 이러한 종류의 문제가 어디에서나 발견된다는 사실을 눈치 챌 수 있습니다.
onComplete
를 구현하도록 정의된 Scala의 Future위의 모든 추상화들은 비동기를 처리하는 조금 나은 방법들입니다.
비동기적인 결과를 동기적인 결과로 변환하는 함수는 흔히 아래와 같이 기술됩니다.
def await[A](fa: Async[A]): A
그러나 우리는 비동기 처리가 다른 일반적인 함수들과 같다고 가정해서는 안됩니다. 그 이유는 CORBA의 실패로부터도 배울 수 있습니다.
비동기 처리에 있어서 우리는 흔히 분산 컴퓨팅의 함정 (fallacies of distributed computing)에 빠지곤 합니다.
당연히 위의 명제는 모두 거짓입니다. 다시 말해, 우리는 흔히 네트워크 상에서 발생한 오류, 레이턴시, 패킷 손실, 대역폭 제한을 충분히 고려하지 않은 코드를 작성한다는 의미입니다.
개발자들은 아래와 같은 방식으로 이러한 문제에 대처해 왔습니다.
이렇게 해결법이 많다는 것은, 그 중에 무엇도 일반적인 목적으로 비동기를 처리하기에 적합하지 않다는 것을 의미합니다. 메모리 관리와 비동기(asynchrony) 처리는 우리 소프트웨어 개발자들이 마주하고 있는 가장 큰 문제라는 점에서, '은 총알은 없다' 딜레마와도 관련이 있습니다.
원주 경고: 개인적인 의견과 불평입니다.
어떤 사람들은 Golang과 같은 M:N 플랫폼을 자랑합니다만, 저는 JVM이나 .Net과 같은 1:1 멀티스레딩 플랫폼을 좋아합니다.
프로그래밍 언어에 충분한 표현성(Scala의 Future와 Promise, Task, Closure의 core.async)이 있는 한, 우리는 1:1 멀티스레딩 플랫폼 위에 M:N 플랫폼도 구현할 수 있기 때문입니다. 하지만 M:N 런타임에서 M:N 멀티스레딩에 문제가 발생했을 때는, 플랫폼을 바꾸지 않고서는 문제를 해결할 수 없습니다. 그리고 실제로 대부분의 M:N 플랫폼은 자주 문제가 생깁니다.
모든 가능한 해결법을 배우고 결정을 내리는 것은 실로 엄청나게 고통스러운 작업이지만, 그래도 충분히 알지 못하고 결정을 내리는 것보다는 낫습니다. 이 문제에 한해서는 TOOWTDI 혹은 “나쁜 기능이 더 낫다”를 신념으로 갖지 않는 것이 좋습니다. Scala나 Haskell과 같은 표현적인(expressive) 신생 언어의 학습 난이도가 너무 높다는 불평은 문제의 요점과 떨어져 있습니다. 새로운 언어를 배우는 것은 병행성(concurreny)을 다루는 데에 있어서 가장 사소한 문제이기 때문입니다. 저는 심지어 병행성의 문제를 해결하지 못해 소프트웨어 업계를 떠난 사람들도 알고 있습니다.
예를 들어서, 두 개의 비동기 작업을 시작하고 그 결과들을 합친다고 합시다.
먼저 작업을 비동기적으로 실행하는 함수를 정의합니다.
import scala.concurrent.ExecutionContext.global type Async[A] = (A => Unit) => Unit def timesTwo(n: Int): Async[Int] = onFinish => { global.execute(new Runnable { def run(): Unit = { val result = n * 2 onFinish(result) } }) } // 사용법 timesTwo(20) { result => println(s"결과: $result") } //=> 결과: 40
하나의 실행이 끝나면 다음을 연속해서 실행하는 방법으로 다음과 같이 두 개의 비동기 결괏값을 합칠 수 있습니다.
def timesFour(n: Int): Async[Int] = onFinish => { timesTwo(n) { a => timesTwo(n) { b => // 두 개의 결과를 합친다 onFinish(a + b) } } } // 사용법 timesFour(20) { result => println(s"결과: $result") } //=> 결과: 80
두 개의 결괏값을 합치는 정도는 간단합니다.
하지만 비동기성은 관련된 모든 것들에 영향을 미친다는 문제가 있습니다. 여러분이 다음과 같은 순수 함수를 만들었다고 합시다.
def timesFour(n: Int): Int = n * 4
하지만 EJB를 사용하려는 EA가 여러분에게 순수 함수가 아닌 비동기 timesTwo
함수를 사용하라고 지시합니다. 이제 여러분의 timesFour
구현은 순수한 수학적 함수에서 side-effect가 넘쳐나는 함수가 될 수밖에 없습니다. 충분히 좋은 Async
형이 없다면 우리는 side-effect가 넘쳐나는 콜백으로 이루어진 작업 흐름을 다뤄야 합니다. 물론 결괏값이 나올 때까지 봉쇄(block)하는 것도 도움이 될 수 없습니다. 그저 문제를 숨기는 것일 뿐이니까요. 2장을 보면 이유를 알 수 있습니다.
게다가 문제는 점점 더 복잡해집니다. 😷
위의 '연속 실행'에서의 두 번째 실행은 첫 번째 실행에 의존하지 않습니다. 즉, 두 실행은 병렬적(parallel)으로 처리될 수 있습니다. JVM에서는 CPU-bound 작업3)들을 병렬적으로 실행할 수 있고, JavaScript에서도 Ajax 요청을 보내거나 Web Worker API를 사용할 때 병렬 처리가 가능합니다.
안타깝게도 병렬 실행에서는 일이 조금 복잡해집니다. 아래와 같은 순진한 접근 방법은 완전히 틀렸습니다.
// 나쁜 예시 def timesFourInParallel(n: Int): Async[Int] = onFinish => { var cacheA = 0 timesTwo(n) { a => cacheA = a } timesTwo(n) { b => // 두 개의 결과를 합친다 onFinish(cacheA + b) } } timesFourInParallel(20) { result => println(s"결과: $result") } //=> 결과: 80 timesFourInParallel(20) { result => println(s"결과: $result") } //=> 결과: 40
바로 이것이 비결정론의 실제 예시입니다. 두 개의 작업이 종료되는 순서가 보장되지 않기 때문에 우리는 동기화를 수행하는 소규모의 상태 기계(state machine)를 구현해야 합니다.
먼저 상태 기계의 ADT를 정의합니다.
// 상태 기계 정의 sealed trait State // 초기 상태 case object Start extends State // B가 종료되어 A를 대기하는 상태 final case class WaitForA(b: Int) extends State // A가 종료되어 B를 대기하는 상태 final case class WaitForB(a: Int) extends State
다음으로, 상태 기계를 비동기적으로 구현합니다.
// JVM에서는 나쁜 예시 (JavaScript에서는 돌아감) def timesFourInParallel(n: Int): Async[Int] = { onFinish => { var state: State = Start timesTwo(n) { a => state match { case Start => state = WaitForB(a) case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } timesTwo(n) { b => state match { case Start => state = WaitForA(b) case WaitForB(a) => onFinish(a + b) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } } }
위의 해결법을 시각화하면 아래와 같은 상태 기계로 나타낼 수 있습니다.
하지만 아직 문제가 남아있습니다. JVM은 1:1 멀티스레딩 플랫폼이기 때문에 공유 메모리에 병행적으로 접근(shared memory concurrency)하게 됩니다. 따라서 state
에 대한 접근이 동기화되어야 합니다.
사용할 수 있는 한 가지 해결 방법은 synchronized
블록을 사용하는 것입니다. 이 방법은 '암묵적인 락(intrinsic lock)'이라고도 부릅니다.
// 모니터처럼 작동할 수 있는 공통 참조 생성 val lock = new AnyRef var state: State = Start timesTwo(n) { a => lock.synchronized { state match { case Start => state = WaitForB(a) case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } } //...
이러한 고수준의 락(high-level lock)은 여러 개의 스레드가 병렬적(parallel)으로 (위의 state
와 같은) 리소스에 접근하지 못하도록 보호합니다. 하지만 저는 개인적으로 고수준 락의 사용을 피합니다. 커널의 스케쥴러는 임의의 스레드를 임의의 이유로 정지시킬 수 있는데, 락을 갖고 있는 스레드가 커널에 의해 정지되면 다른 모든 스레드가 정지되기 때문입니다. 정지를 피하고 끊임 없이 작업을 실행하기 위해서는 비봉쇄 알고리즘(non-blocking logic)이 더 낫습니다.
그래서 이 경우에는 AtomicReference를 사용하는 것이 정답입니다.
// JVM에서 잘 돌아가는 예시 import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference def timesFourInParallel(n: Int): Async[Int] = { onFinish => { val state = new AtomicReference[State](Start) @tailrec def onValueA(a: Int): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForB(a))) onValueA(a) // 재시도 case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } timesTwo(n)(onValueA) @tailrec def onValueB(b: Int): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForA(b))) onValueB(b) // 재시도 case WaitForB(a) => onFinish(a + b) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } timesTwo(n)(onValueB) } }
이제 조금 어려워졌나요? 조금 더 힘내봅시다.
위의 onFinish
의 호출이 스택-안전하지 않고(stack-unsafe), 호출 시점에서 비동기 범위(asynchronous boundary)를 강제하지 않으면 프로그램이 StackOverflowError
로 터질 수 있다고 한다면, 어떤 생각이 드시나요? 4)
말 그대로의 의미입니다. 앞서 작성한 코드를 제네릭한 방법(generic way)으로 다시 작성해 봅시다.
import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference type Async[+A] = (A => Unit) => Unit def mapBoth[A,B,R](fa: Async[A], fb: Async[B])(f: (A,B) => R): Async[R] = { // 상태 기계 정의 sealed trait State[+A,+B] // 초기 상태 case object Start extends State[Nothing, Nothing] // B가 종료되어 A를 대기하는 상태 final case class WaitForA[+B](b: B) extends State[Nothing,B] // A가 종료되어 B를 대기하는 상태 final case class WaitForB[+A](a: A) extends State[A,Nothing] onFinish => { val state = new AtomicReference[State[A,B]](Start) @tailrec def onValueA(a: A): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForB(a))) onValueA(a) // 재시도 case WaitForA(b) => onFinish(f(a,b)) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } @tailrec def onValueB(b: B): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForA(b))) onValueB(b) // retry case WaitForB(a) => onFinish(f(a,b)) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } fa(onValueA) fb(onValueB) } }
그리고 Scala의 Future.sequence
와 유사한 기능을 구현합시다. 우리의 의지는 강인하고 용기는 무한하니까요. 😇
def sequence[A](list: List[Async[A]]): Async[List[A]] = { @tailrec def loop(list: List[Async[A]], acc: Async[List[A]]): Async[List[A]] = list match { case Nil => onFinish => acc(r => onFinish(r.reverse)) case x :: xs => val update = mapBoth(x, acc)(_ :: _) loop(xs, update) } val empty: Async[List[A]] = _(Nil) loop(list, empty) } // 호출 sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))) { r => println(s"결과: $r") } //=> 결과: List(20, 40, 60)
이것으로 끝났을까요? 아닙니다.
val list = 0.until(10000).map(timesTwo).toList sequence(list)(r => println(s"합: ${r.sum}"))
자, 위의 코드를 실행시키고 영광스러운 메모리 오류를 목도하십시오. 프로덕션 환경이었다면 아래의 오류는 Scala의 NonFatal
로 처리되지도 않아서 프로그램이 크래시를 일으켰을 겁니다.
java.lang.StackOverflowError at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2414) at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2630) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.execute(ExecutionContextImpl.scala:131) at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:20) at .$anonfun$timesTwo$1(<pastie>:27) at .$anonfun$timesTwo$1$adapted(<pastie>:26) at .$anonfun$mapBoth$1(<pastie>:66) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67)
앞서 말씀드린 것과 같이, 강제된 비동기 범위 없이 실행된 onFinish
는 스택 오버플로우를 유발할 수 있습니다. JavaScript에서는 setTimeout
을 사용한 스케쥴링으로 문제를 해결할 수 있으며, JVM에서는 스레드 풀을 사용하거나 Scala의 ExecutionContext
를 사용할 수 있습니다. 5)
scala.concurrent.Future
는 earger evaluation이 적용되는 비동기 연산을 기술하는 방법입니다. 지금까지 위에서 사용한 Async
형과도 유사합니다.
Future와 Promise는 일부 병행 프로그래밍 언어(concurrent programming language)에서 프로그램의 실행을 동기화하기 위해서 사용하는 생성자이다. Future와 Promise가 기술하는 객체는 연산이 아직 완료되지 않아 초기에는 계산할 수 없는 값에 대한 대리자(proxy)이다. Futures and promises, 영문 위키피디아
원주 필자의 불평:
Future와 Promise에 관한 docs.scala-lang.org 문서의 다음 설명은 오해를 불러일으키기 쉽고 오개념의 원인이 됩니다.
“Future는 여러 동작을 병렬적(paraell)으로 수행하는 방법을 제공합니다. 이러한 방법은 효율적이며 비봉쇄적(non-blocking)입니다.”
Future
형은 비동기(asynchrony)를 기술하는 것이지, 병렬성(parallelism)을 기술하는 것이 아닙니다. 물론Future
를 사용해서 병렬처리를 하는 것도 가능하지만, 그렇다고 그것이 병렬처리만을 할 수 있다는 뜻은 아닙니다. 게다가 CPU의 성능을 최대로 끌어올릴 수 있는 방법을 찾는 사람들에게 있어서Future
는 비용이 크고 현명하지 못한 선택입니다. (4.4절 참고)
Future
는 다음과 같이 두 개의 주요 연산(primary operation)과, 주요 연산을 기반으로 정의되는 여러 함수 콤비네이터들로 정의되는 인터페이스입니다.
import scala.util.Try import scala.concurrent.ExecutionContext trait Future[+T] { // abstract def value: Option[Try[T]] // abstract def onComplete(f: Try[T] => Unit)(implicit ec: ExecutionContext): Unit // 값 변형 def map[U](f: T => U)(implicit ec: ExecutionContext): Future[U] = ??? // 연속 실행 ;-) def flatMap[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[U] = ??? // ... }
Future
의 특성은 다음과 같습니다.
value
속성은 결괏값을 메모아이즈하기 위해 존재합니다. 아직 연산이 완료되지 않은 경우에는 None
이 할당됩니다. 당연한 이야기지만, value
속성의 def value
를 호출하면 비결정론적인 결과를 얻습니다.
다음은 ExecutionContext
에 대한 부연설명입니다.
ExecutionContext
는 비동기 실행을 관리합니다. 스레드 풀과 유사하다고 할 수 있겠지만, 반드시 스레드 풀의 역할만을 하는 것은 아닙니다. (비동기는 멀티스레딩이나 병렬성과 다르니까요)onComplete
는 기본적으로 우리가 위에서 정의한 Async
형과 유사하지만, ExecutionContext
를 인수로 받습니다. 모든 완료 콜백은 비동기적으로 호출되어야 하기 때문입니다.onComplete
를 기반으로 구현되었기 때문에 ExecutionContext
를 인수로 받습니다.
왜 모든 시그니처에 ExecutionContext
를 붙여야 하는지 아직 이해하지 못하겠다면 3.3절을 다시 읽어보세요.
3장에서 만든 함수를 Future
로 다시 정의해 봅시다.
import scala.concurrent.{Future, ExecutionContext} def timesTwo(n: Int)(implicit ec: ExecutionContext): Future[Int] = Future(n * 2) // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global timesTwo(20).onComplete { result => println(s"결과: $result") } //=> 결과: Success(40) }
아주 쉽습니다. Future.apply
메서드는 주어진 ExecutionContext
에서 주어진 연산을 수행합니다. JVM에서 global
컨텍스트를 선택했으므로, 연산은 별개의 스레드에서 실행됩니다.
3.1절의 연속 실행은 다음과 같이 구현할 수 있습니다.
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global timesFour(20).onComplete { result => println(s"결과: $result") } //=> 결과: Success(80) }
역시 아주 쉬워졌습니다. 위의 for-comprehension은 아래와 같이 flatMap
과 map
으로 풀어서 쓸 수도 있습니다.
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = timesTwo(n).flatMap { a => timesTwo(n).map { b => a + b } }
프로젝트에서 scala-async를 사용하면 다음과 같이 할 수도 있습니다.
import scala.async.Async.{async, await} def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = async { val a = await(timesTwo(a)) val b = await(timesTwo(b)) a + b }
scala-async
라이브러리는 매크로를 이용해서 여러분의 코드를 flatMap
과 map
을 이용하는 코드로 변환합니다. 다시 말해서, await
는 스레드를 봉쇄(block)하지 않습니다. 그럴 것 같은 인상을 주더라도 말이죠.
scala-async
는 정말로 훌륭해 보이지만, 안타깝게도 여러가지 한계가 있습니다. 예를 들어서 익명 함수 안에 있는 await
는 변환되지 않습니다. 아쉽지만 Scala 코드는 보통 그런 표현식으로 넘쳐납니다. 다음과 같은 코드도 작동하지 않습니다.
// 나쁜 예시 def sum(list: List[Future[Int]])(implicit ec; ExecutionContext): Future[Int] = async { var sum = 0 // for는 foreach로 변환되기 때문에 작동하지 않습니다 for (f <- list) { sum += await(f) } }
scala-async
의 접근법은 마치 1급 지속을 구현한 것과 같은 착각을 일으킵니다. 하지만 그러한 지속은 안타깝게도 1급이 아니고, 그저 컴파일러에 의해서 관리되는 코드 재작성일 뿐입니다. 그리고 이러한 한계는 C#과 ECMAScript에도 똑같이 적용됩니다. 그리고 이것은 안타깝게도 함수형 프로그래밍에서 async
키워드가 그다지 많이 쓰일 수 없음을 의미합니다.
'은 총알은 없다'는 저의 불평이 다시 떠오른다면 좋겠습니다. 😞
3.2절에서와 마찬가지로 두 번의 함수 호출은 서로에 대해 독립적이기 때문에 병렬적으로 실행할 수 있습니다. 초보자에게는 계산 구문(evaluation semantics)이 조금 난해할 수 있지만, Future
를 사용하면 이 작업은 더 쉬워집니다.
def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = { // Future는 eager evaluation을 수행하므로, 둘 모두 값을 합치기 전에 실행됩니다 val fa = timesTwo(n) val fb = timesTwo(n) for (a <- fa; b <- fb) yield a + b // fa.flatMap(a => fb.map(b => a + b)) }
이러한 방법은 초보자에게는 조금 헷갈릴 수 있습니다. 작업이 병렬적(parallel)으로 실행되도록 하기 위해서는 간단히 결괏값을 합치기 이전에 Future
를 생성하면 됩니다.
다른 방법으로, 임의의 컬렉션에 대해 Future.sequence
를 사용할 수도 있습니다.
def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = Future.sequence(timesTwo(n) :: timesTwo(n) :: Nil).map(_.sum)
이 방법도 초보자에게는 조금 충격적일 수도 있습니다. 여기서 Future
들은 그 컬렉션이 '엄격하게' 정의되었을 때에만 병렬적(parallel)으로 실행되기 때문입니다. 이때 '엄격하게'라는 말은 Scala의 Stream
이나 Iterator
등으로 정의되지 않았음을 의미합니다. 이러한 표현이 초보자에게는 불명확한 탓도 있을 듯합니다.
Future
형은 재귀 동작에 대해서 완전히 안전합니다. (콜백 호출을 ExecutionContext
에 의존하고 있기 때문입니다.) 3.3절의 예시를 다시 작성해 봅시다.
def mapBoth[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R) (implicit ec: ExecutionContext): Future[R] = { for (a <- fa; b <- fb) yield f(a,b) } def sequence[A](list: List[Future[A]]) (implicit ec: ExecutionContext): Future[List[A]] = { val seed = Future.successful(List.empty[A]) list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l) .map(_.reverse) } // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println) // => List(20, 40, 60) }
이번에는 StackOverflowError
가 발생하지 않습니다.
val list = 0.until(10000).map(timesTwo).toList sequence(list).foreach(r => println(s"합: ${r.sum}")) //=> 합: 99990000
Future
의 문제점은 onComplete
를 호출할 때마다 ExecutionContext
가 사용된다는 점입니다. 이러한 동작은 일반적으로 Runnable
이 스레드 풀로 전달하는 것이고, 논리 스레드를 포크(fork)하게 됩니다. CPU-bound 작업이 많다면 Future
의 구현은 성능에 재앙과도 같습니다. 스레드를 이동하는 동작은 문맥 교환(context switch)을 발생시키고 CPU 캐시의 참조 국소성(CPU cache locality)을 파괴하기 때문입니다. 물론 Future
의 구현은 어느정도의 최적화를 수행하긴 합니다. 내부 콜백으로 인해서 새 스레드가 포크되지 않도록 flatMap
은 내부 실행 맥락(internal execution context)을 사용합니다. 하지만 그러한 최적화로는 부족하고, 벤치마크 결과는 거짓말을 하지 않습니다.
또 메모아이제이션이 이루어진다는 것은, 작업이 완료되었을 때 생산자(producer)와 소비자(listener)에서 각각 한 번 씩 AtomicReference.compareAndSet
이 호출된다는 의미입니다. 메모아이제이션이 멀티스레드 환경에서 잘 작동하도록 하는 이 호출들은 꽤 많은 비용이 듭니다.
다시 말해서, 여러분이 CPU의 성능을 최대로 끌어올려서 CPU-bound 작업을 수행하려고 한다면 Future
와 Promise
를 사용하는 것은 그다지 좋은 생각이 아닙니다.
Scala의 Future
와 Task
구현의 성능 비교는, 다음의 최근 벤치마크 결과로 볼 수 있습니다.
[info] Benchmark (size) Mode Cnt Score Error Units [info] FlatMap.fs2Apply 10000 thrpt 20 291.459 ± 6.321 ops/s [info] FlatMap.fs2Delay 10000 thrpt 20 2606.864 ± 26.442 ops/s [info] FlatMap.fs2Now 10000 thrpt 20 3867.300 ± 541.241 ops/s [info] FlatMap.futureApply 10000 thrpt 20 212.691 ± 9.508 ops/s [info] FlatMap.futureSuccessful 10000 thrpt 20 418.736 ± 29.121 ops/s [info] FlatMap.futureTrampolineEc 10000 thrpt 20 423.647 ± 8.543 ops/s [info] FlatMap.monixApply 10000 thrpt 20 399.916 ± 15.858 ops/s [info] FlatMap.monixDelay 10000 thrpt 20 4994.156 ± 40.014 ops/s [info] FlatMap.monixNow 10000 thrpt 20 6253.182 ± 53.388 ops/s [info] FlatMap.scalazApply 10000 thrpt 20 188.387 ± 2.989 ops/s [info] FlatMap.scalazDelay 10000 thrpt 20 1794.680 ± 24.173 ops/s [info] FlatMap.scalazNow 10000 thrpt 20 2041.300 ± 128.729 ops/s
CPU-bound 작업은 Monix Task가 Scala의 Future
에 비해 압도적인 성능을 보여주고 있습니다.
원주 이 벤치마크 결과에는 한계가 있습니다. 여전히Future
가 더 빠른 경우도 있고 (Monix의 Observer는 몇 가지 합당한 이유로 인해Future
를 사용하고 있습니다), 애초에 CPU-bound 작업의 성능이 크게 중요하지 않은 경우도 자주 있습니다. 예를 들어서 I/O 작업을 할 때는 처리량과 CPU 속도가 별 상관이 없습니다.
Task
는 lazy하고 비동기적으로 처리될 가능성이 있는 연산을 제어하기 위한 형입니다. Task
는 특히 사이드 이펙트를 제어하고 비결정론과 콜백 지옥을 방지하는 데 유용합니다.
Monix의 Task 구현은 매우 정교합니다. Scalaz의 Task로부터 많은 영향을 받았기 때문에 같은 개념으로 개발되었지만, 그 구현은 다릅니다.
또Task
형은 Haskell의 IO 모나드로부터도 영향을 받았습니다. 필자는 Task가 Scala에서의 IO형이라고 생각합니다. 다만 이 방식이 좋은 것인지에 대해서는 논란이 있습니다.
Scalaz은 동기 작업만을 위한 별도의 IO형을 갖고 있습니다. 즉 Scalaz의 IO형은 비동기가 아니기 때문에 JVM에서 어떻게든 별도로 비동기 연산을 처리할 필요가 생깁니다. 반면 Haskell에서는 Async형이 있고, 이 Async형은 IO형으로 변환되기 때문에 런타임에 관리될 수 있습니다. (이때 그린스레드 등이 사용됩니다.)
JVM를 위한 Scalaz의 구현을 따르면, 비동기 연산을 IO형으로 나타낼 방법이 없습니다. 또 Scalaz의 구현에서는 스레드를 봉쇄(block)하지 않고도 비동기 연산을 처리할 방법이 없습니다. 그러나 스레드 봉쇄(block)는 오류를 일으키기 쉬우므로 가급적 피하는 것이 좋습니다.
요약하자면 Task
형은 다음과 같은 특성이 있습니다.
Future
와 달리 runAsync
가 호출되기 전까지 어떤 실행이나 사이드 이펙트도 발생시키지 않습니다.
특히 Monix의 구현에서 Task
형은
Monix의 구현에서 Task
형은 다음과 같은 표로 정리할 수도 있습니다.
3장에서 만든 함수를 Task
를 사용해서 다시 작성해 봅시다.
import monix.eval.Task def timesTwo(n: Int): Task[Int] = Task(n * 2) // 사용법 { // 계산되는 시점에 ExecutionContext가 필요합니다 import monix.execution.Scheduler.Implicits.global timesTwo(20).foreach { result => println(s"결과: $result") } //=> 결과: 40 }
이 코드는 Future
를 사용한 4.1절의 코드와 거의 똑같아 보입니다. 유일한 차이점은 timesTwo
가 더이상 ExecutionContext
를 인수로 받지 않는다는 점입니다. 이것은 Task
참조가 마치 함수처럼 lazy하게 계산되기 때문입니다. 실제 계산이 일어나도록하는 foreach
가 호출되면 그때 비로소 결과가 출력됩니다. 그리고 바로 그때 Scheduler가 필요해집니다. Scheduler
는 Monix의 향상된 ExecutionContext
입니다.
이제 3.1절에서 했던 것처럼 해 봅시다.
def timesFour(n: Int): Task[Int] = for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b // Usage { // 계산되는 시점에 ExecutionContext가 필요합니다 import monix.execution.Scheduler.Implicits.global timesFour(20).foreach { result => println(s"결과: $result") } //=> 결과: 80 }
Future
형과 마찬가지로 for-comprehension은 Scala 컴파일러에 의해서 아래와 같은 flatMap
과 map
에 대한 호출로 변환됩니다.
def timesFour(n: Int): Task[Int] = timesTwo(n).flatMap { a => timesTwo(n).map { b => a + b } }
Task
는 Future
보다 더 수월하게 병렬성을 구현할 수 있습니다. Task
는 작업을 포크할 때 더 세밀한 제어를 가능하게 할 뿐만 아니라, flatMap
과 map
등의 변형을 현재 스레드의 현재 호출 스택에서 수행하기 때문입니다. 이를 통해 캐시 국소성(cache locality)을 유지하고 문맥 교환(context switch)을 피할 수 있습니다.
그러나 다음과 같이 단순히 Future
를 Task
로 바꾸는 것만으로는 부족합니다.
// 나쁜 예 (이 코드는 여전히 연속 실행됩니다) def timesFour(n: Int): Task[Int] = { // Task는 lazy하게 계산되므로 여기서는 아직 계산되지 않습니다. val fa = timesTwo(n) val fb = timesTwo(n) // Lazy evaluation으로 인해 값이 연속으로 계산됩니다. for (a <- fa; b <- fb) yield a + b }
Task
를 사용한 병렬 실행은 다음과 같이 명시적으로 나타내야 합니다.
def timesFour(n: Int): Task[Int] = Task.mapBoth(timesTwo(n), timesTwo(n))(_ + _)
mapBoth
가 왠지 익숙하지 않나요? 이렇게 하면 mapBoth
가 두 작업을 동시에 시작하기 때문에 두 개의 스레드가 포크되면서 병렬적으로 값이 계산됩니다.
flatMap
에 대해 Task
는 재귀적이면서 스택-안전하도록 설계되었습니다. 내부적으로는 트램폴린 패턴을 사용해서 매우 효율적이기도 합니다. Task
는 Rúnar Bjarnason의 논문 <Stackless Scala with Free Monads>을 바탕으로 구현되었습니다.
Task
를 사용한 sequence
구현은 4.3절의 Future
를 사용한 구현과 유사하지만, 아래의 sequence
는 시그니처가 lazy evaluation을 나타내고 있습니다.
def sequence[A](list: List[Task[A]]): Task[List[A]] = { val seed = Task.now(List.empty[A]) list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l) .map(_.reverse) } // 사용법 { // 계산되는 시점에 ExecutionContext가 필요합니다 import monix.execution.Scheduler.Implicits.global sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println) // => List(20, 40, 60) }
map
, flatMap
, mapBoth
등의 함수들을 사용할 때 우리는 더이상 “(A => Unit) => Unit
“과 같은 내부 구조를 신경쓰지 않아도 됩니다. 그러한 함수들은 규칙적(lawful)이고, 순수함수적이고, 참조-투명하기 때문입니다. 이는 우리가 함수와 함수의 결과에 대해 생각할 때 그 함수가 사용되는 환경을 신경쓰지 않아도 됨을 의미합니다.
이것이 바로 Hakell의 IO
가 대단한 이유입니다. Haksell은 사이드 이펙트를 '흉내내지' 않습니다. IO
값을 반환하는 함수들은 순수함수적이고 사이드이펙트를 프로그램의 주변부로 밀어냅니다. 이것은 Task
에 대해서도 마찬가지입니다. Future
도 eager evaluation으로 인해 조금 복잡해지긴 하지만 어쨌든 나쁘지 않은 선택입니다.
그렇다면 Task
, Future
, Coeval
, Eval
, IO
, Id
, Observable
등의 형을 추상화하는 인터페이스를 만들 수는 없을까요?
가능합니다. 우리는 이미 순차 실행을 추상화하는 flatMap
을 보았고, 병렬 실행을 추상화하는 mapBoth
를 보았습니다. 하지만 우리는 그러한 기능들을 고전적인 OOP 인터페이스로는 구현할 수 없습니다. 한 가지 이유는, Function1
인자에 대한 공변성과 반변성의 법칙(covariance and contravariance rules)에 따라 flatMap
에서 형에 대한 정보가 소실되기 때문입니다. (F-바운드 다형성 타입을 사용하면 소실을 막을 수 있지만, 이 기능은 구현 재사용에 더 적합할 뿐만 아니라 다른 OOP 언어는 지원하지도 않습니다.) 그리고 그보다 더 근본적으로, OOP에서는 데이터 생성자를 메서드로 표현할 수 없기 때문입니다. (즉, OOP의 서브타입은 클래스 전체가 아니라 각각의 인스턴스에 적용되기 때문입니다.)
마침 Scala는 상류 타입(higher kinded types)을 지원하는 몇 안되는 언어에 속하고 타입 클래스를 지원하기 때문에 Haskell의 기능을 그대로 옮겨오는 것이 가능합니다. 😄 8)
원주 저자의 분노:Monad
,Applicative
,Functor
등의 단어를 공포스럽게 생각하거나 현실과 동떨어진 “학문적인” 개념으로 치부하는 사람이 많은 탓에 많은 저자들이 저 단어들을 사용하지 않기 위해 노력합니다. 이것은 Scala의 API 문서와 공식 튜토리얼도 마찬가지입니다.
하지만 그러한 설명 방식은 Scala와 사용자 모두에게 민폐입니다. 다른 언어에서 저 개념들은 단순히 설명하기 어려운 디자인 패턴에 불과합니다. 대부분의 다른 언어들은 형에 대한 표현성이 부족하기 때문입니다. 저 개념들을 표현할 수 있는 언어는 손에 꼽습니다. 언어 사용자의 입장에서도 문제가 생겼을 때 저 개념들을 모른 채 관련된 자료를 검색하는 것은 매우 고통스러운 일입니다.
또 저는 이것이 모르는 것에 대한 본능적인 공포에서 나오는 일종의 반지성주의라고 생각합니다. 예를 들어서 Java의 Optional형은 함자 법칙(functor law)에 위배됩니다. (즉,opt.map(f).map(g) != opt.map(f andThen g)
입니다.) Swift에서는 어이없게도5 == Some(5)
입니다.Some(null)
도 이상하지 않습니다.null
이AnyRef
의 유효한 값이기 때문입니다. 그렇지 않았더라면Applicative[Option]
를 정의하는 것이 불가능했을 것입니다. 이 예시들을 어떻게 반지성주의자들에게 설명할 수 있겠습니까.
이 글의 목적은 모나드에 대해 설명하는 것이 아닙니다. 모나드에 관해서는 다른 좋은 글들이 있습니다. 모나드는 잘 모르지만 비동기 프로그래밍에 대한 감을 쌓기 위해서 이 글을 읽고 있다면 적어도 알아두어야 할 것이 있습니다. Future
나 Task
등의 자료형을 사용함에 있어서 모나드는 실행의 순서를 올바르게 표현하는 유일한 방법이라는 사실입니다.
관찰 결과: 명령형 언어(imperative language)로 병행성(concurrency)을 다루는 프로그래머들은 ”;”이 순서를 정의한다는 맹목적인 믿음으로 인해 함정에 빠지고 만다. Aleksey Shipilëv
Monad
형을 Scala로 표현해 봅시다.
// 이 줄을 적지 않아도 된다면 좋을텐데 말이죠 :-( import scala.language.higherKinds trait Monad[F[_]] { /** 생성자 (`A`를 `F[A]` Monad로 리프팅합니다.) * `Applicative`의 일부이기도 합니다. 아래를 보세요. */ def pure[A](a: A): F[A] /** 빰 */ def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B] }
이어서 Future
를 구현합시다.
import scala.concurrent._ // Supplying an instance for Future isn't clean, ExecutionContext needed class FutureMonad(implicit ec: ExecutionContext) extends Monad[Future] { def pure[A](a: A): Future[A] = Future.successful(a) def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) } object FutureMonad { implicit def instance(implicit ec: ExecutionContext): FutureMonad = new FutureMonad }
FutureMonad
형은 무척 강력한 도구입니다. Task
, Future
, IO
뿐만 아니라 무엇이든지 FutureMonad
와 함께 사용할 수 있습니다. 다음과 같이 flatMap
을 스택-안전하는 것도 가능합니다.
/** 피보나치 수열의 N번째 숫자를 구합니다. */ def fib[F[_]](n: Int)(implicit F: Monad[F]): F[BigInt] = { def loop(n: Int, a: BigInt, b: BigInt): F[BigInt] = F.flatMap(F.pure(n)) { n => if (n <= 1) F.pure(b) else loop(n - 1, b, a + b) } loop(n, BigInt(0), BigInt(1)) } // 사용법 { // 유효범위(scope)를 만듭니다. import FutureMonad.instance import scala.concurrent.ExecutionContext.Implicits.global // 실행 fib[Future](40).foreach(r => println(s"결과: $r")) //=> 결과: 102334155 }
원주 이 코드는 그저 장난 수준입니다. 본격적인 프로젝트를 보고 싶다면 Typelevel의 Typelevel's Cats를 참고하세요.
모나드는 실행의 순서를 정의합니다. 하지만 동시에 실행 가능한 서로 독립적인 연산들의 결과를 합쳐야 할 때도 있습니다. 그런 경우에는 Monad
보다 Applicative
가 적절할 수 있습니다.
먼저 간단한 타입백과사전(Typeclassopedia)을 만들어 봅시다. 9)
trait Functor[F[_]] { /** 이 코드는 독자 여러분 모두 이해하리라고 믿습니다. */ def map[A,B](fa: F[A])(f: A => B): F[B] } trait Applicative[F[_]] extends Functor[F] { /** 생성자 (`A`를 `F[A]` Applicative로 리프팅합니다.) */ def pure[A](a: A): F[A] /** 두 참조에 대해서 동시에 map을 실행합니다. * * 다른 구현체에서는 Applicative 연산자가 `ap`일 수 있습니다. * 하지만 `map2` 구현이 훨씬 이해하기 쉽습니다. */ def map2[A,B,R](fa: F[A], fb: F[B])(f: (A,B) => R): F[R] } trait Monad[F[_]] extends Applicative[F] { def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B] }
이어서 Future
를 구현합니다.
class FutureMonad(implicit ec: ExecutionContext) extends Monad[Future] { def pure[A](a: A): Future[A] = Future.successful(a) def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) def map2[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R): Future[R] = // flatMap에 기반하지 않은 구현체를 위한 함수이지만, // Task는 해당되지 않습니다 ;-) for (a <- fa; b <- fb) yield f(a,b) } object FutureMonad { implicit def instance(implicit ec: ExecutionContext): FutureMonad = new FutureMonad }
이제 Future
, Task
등과 사용할 수 있는 제네릭 함수를 Application
에 기반해서 정의할 수 있습니다.
def sequence[F[_], A](list: List[F[A]]) (implicit F: Applicative[F]): F[List[A]] = { val seed = F.pure(List.empty[A]) val r = list.foldLeft(seed)((acc,e) => F.map2(acc,e)((l,a) => a :: l)) F.map(r)(_.reverse) }
원주 다시 한 번 강조하지만, 이 코드는 단지 장난 수준일 뿐입니다. 본격적인 프로젝트를 보고 싶다면 Typelevel의 Cats를 참고하세요.
지금까지 살펴 본 내용에서 빠진 부분은 실제로 계산을 시작하고 결괏값을 받아오는 방법입니다. Scala의 Future
를 생각해 보면, 우리는 onComplete
를 추상화해야 하는 것입니다. Monix의 Task
로 치자면 runAsync
를 추상화하는 것이기도 합니다. Haskell이나 Scalaz의 IO
에서는 unsafePerformIO
입니다.
FS2에서는 Effect라는 타입 클래스를 정의했습니다. 다음은 그 구현을 간략화한 코드입니다.
trait Effect[F[_]] extends Monad[F] { def unsafeRunAsync[A](fa: F[A])(cb: Try[A] => Unit): Unit }
우리가 처음에 만들었던 Async
형과 비슷한 것 같기도 합니다. Future.onComplete
, Task.runAsync
와는 더욱 비슷합니다. IO.unsafePerformIO
에 적용할 수 있을 것 같기도 합니다.
하지만 이것은 다음의 이유로 진짜 타입 클래스라고 하기는 어렵습니다.
Show
와 같이 유용하지만 불규칙적인 타입 클래스도 분명 존재합니다.) 그보다는 더 중요한 이유가 있습니다.
실행 문맥(execution context)을 지정하는 방법은 구현 방식에 따라 다릅니다. Java는 Executor
를 사용합니다. Scala의 Future
는 ExecutionContext
를 쓰고, Monix는 ExecutionContext
를 향상시킨 Scheduler
를 사용합니다. FS2와 Scalaz는 스레드를 포크(fork)할 때 Executor
의 래퍼(wrapper)인 Strategy
를 사용하지만 unsafePerformIO
나 runAsync
가 호출될 때 실행 문맥(execution context)을 변경하지는 않습니다. (그래서 사실 Scalaz의 많은 콤비네이터들은 안전하지 않습니다.)
우리는 이와 같은 전략을 Future
에 대해서도 취할 수 있습니다. 유효범위(scope)로부터 implicit whatever: Context
를 받아서 타입 클래스의 인스턴스를 생성하는 방식입니다. 하지만 이러한 방식은 약간 이상하고 비효율적입니다. 게다가 이 방식을 따르면 Effect.unsafePerformIO
만으로 flatMap
을 정의할 수 없고, 그렇다면 그 인스턴스가 반드시 Monad
를 상속한다고 보기울 것입니다.
그래서 처음 주어진 질문(“비동기 계산을 위한 타입 클래스를 정의할 수 있을까요?”)에 대해 저는 확신이 없습니다. Cats에 도입할 수 있는 아이디어가 있다면 저도 꼭 듣고 싶습니다.
여러분이 이 사고 실험을 재밌게 즐기셨다면 좋겠습니다. 설계는 늘 즐겁죠. 😎
어떤 추상화는 다른 추상화보다 더 일반적인 목적을 가지고 있습니다. 저는 개인적으로 “특정한 작업에는 특정한 최적의 도구가 있다”는 사고방식이 잘못된 선택을 정당화하기 위해 남용되고 있다고 생각합니다.
이 시점에서 우리는 Rúnar Bjarnason의 <제약은 자유이며, 자유는 제약이다>를 보아야 합니다. 병행성(concurrency)을 추상화하는 작업의 핵심을 정확하게 짚는 멋진 발표입니다.
이미 말했듯이, 병행성(concurrency)을 처리함에 있어 모든 상황에 일반적으로 적용될 수 있는 '은 총알'은 없습니다. 더 고수준의 추상화일수록, 문제를 해결하는 데 필요한 유효범위(scope)는 좁아집니다. 그리고 모델의 유효범위가 좁아지고 기능이 줄어들수록, 그 모델은 간단해지고 결과를 합치기도 쉬워집니다. 예를 들어서 Scala 커뮤니티의 개발자들은 Akka의 행위자 모델(actor model)을 과도하게 사용하는 경향이 있습니다. (물론 Akka는 잘못 쓰지만 않는다면 훌륭한 라이브러리입니다.) 그러나 Future
나 Task
로도 같은 기능을 구현할 수 있다면 Akka의 Actor
는 굳이 사용하지 않는 편이 낫습니다. Monix와 ReactiveX의 Observable
등과 같은 다른 추상화도 마찬가지입니다.
그리고 이 두 가지 규칙을 마음에 새기도록 합시다.
마지막으로 이것만 말하겠습니다. 병행성(concurrency) 전문가들은 애초에 병행성이 생기지 않도록 하는 데 그 누구보다 전문적입니다. 💀
setTimeout
을 사용한 스케줄링에 관해서는 역자가 제6회 D2 CAMPUS SEMINAR에서 자세하게 발표했었습니다. 관련 자료는 D2 공식 사이트에 등록되어 있습니다.