문서의 이전 판입니다!
비동기(asynchrony)는 어떤 곳에서도 사용될 수 있는, 병행성(concurrency)을 포괄하는 개념입니다. 이 글은 비동기 처리가 무엇인지, 어떤 문제를 해결해야 하는지에 관해 다룹니다. 1)
비동기는 멀티스레딩(multithreading)보다 넓은 개념임에도 불구하고, 일부 개발자들은 비동기와 멀티스레딩을 혼동합니다. 비동기와 멀티스레딩의 관계는 다음과 같습니다.
Multithreading <: Asynchrony
비동기 연산은 다음과 같이 형(type)을 통해서 표현할 수도 있습니다.
type Async[A] = (Try[A] => Unit) => Unit
여러 개의 Unit
반환형(return type)이 더럽다고 느껴지는 이유는, 바로 비동기가 더럽기 때문입니다. 다음과 같은 특성을 갖는 모든 작업, 프로세스, 네트워크 상의 한 노드를 비동기 연산이라고 할 수 있습니다.
비동기가 병행성(concurrency)을 포괄하긴 하지만, 그것이 필연적으로 멀티스레딩을 포괄하는 것이 아님은 매우 중요합니다. 예를 들어, JavaScript에서는 대부분의 I/O 동작이 비동기적일 뿐만 아니라, 아주 무거운 비지니스 로직도 (setTimeout
에 기반한 스케쥴링을 통해) 비동기적으로 처리될 수 있기 때문에, 인터페이스가 계속해서 응답할 수 있습니다. 2)
프로그램에 비동기를 도입하는 일은, 병행성 문제(concurrency problems)를 일으킵니다. 우리는 비동기 연산이 언제 끝날지 절대로 알 수 없어서, 동시에 작동한 여러 개의 비동기 연산의 결과를 모으는 일은 동기화(synchronization)를 필요로 하게 되기 때문입니다. 동기화 작업은 프로그램이 더 이상 동작의 순서에 의존하지 않도 하는 작업이기도 한데, 순서로부터의 독립은 바로 비결정론적 알고리즘의 핵심 요소이기도 합니다.
영문 위키피디아, Nondeterministic algorithm: 비결정론적(nondeterministic) 알고리즘은 같은 입력에 대해서도 새로운 실행에서는 이전과 다르게 동작할 수 있는 알고리즘을 가리킨다. 결정론적(deterministic) 알고리즘과 반대 관계이다. … 병행(concurrent) 알고리즘은 경쟁 상태(race condition)로 인해 새로운 실행에서 이전과 다르게 동작할 수 있다.
실력 있는 독자라면, 사용되는 방식과 환경에 따라서 조금씩의 차이는 있지만 이러한 종류의 문제가 어디에서나 발견된다는 사실을 눈치 챌 수 있습니다.
onComplete
를 구현하도록 정의된 Scala의 Future위의 모든 추상화들은 비동기를 처리하는 조금 나은 방법들입니다.
비동기적인 결과를 동기적인 결과로 변환하는 함수는 흔히 아래와 같이 기술됩니다.
def await[A](fa: Async[A]): A
그러나 우리는 비동기 처리가 다른 일반적인 함수들과 같다고 가정해서는 안됩니다. 그 이유는 CORBA의 실패로부터도 배울 수 있습니다.
비동기 처리에 있어서 우리는 흔히 분산 컴퓨팅의 함정 (fallacies of distributed computing)에 빠지곤 합니다.
당연히 위의 명제는 모두 거짓입니다. 다시 말해, 우리는 흔히 네트워크 상에서 발생한 오류, 레이턴시, 패킷 손실, 대역폭 제한을 충분히 고려하지 않은 코드를 작성한다는 의미입니다.
개발자들은 아래와 같은 방식으로 이러한 문제에 대처해 왔습니다.
이렇게 해결법이 많다는 것은, 그 중에 무엇도 일반적인 목적으로 비동기를 처리하기에 적합하지 않다는 것을 의미합니다. 메모리 관리와 병렬성(concurrency) 처리는 우리 소프트웨어 개발자들이 마주하고 있는 가장 큰 문제라는 점에서, '은 총알은 없다' 딜레마와도 관련이 있습니다.
원주: 경고 - 개인적인 의견과 불평입니다.
어떤 사람들은 Golang과 같은 M:N 플랫폼을 자랑합니다만, 저는 JVM이나 .Net과 같은 1:1 멀티스레딩 플랫폼을 좋아합니다.
프로그래밍 언어에 충분한 표현성(Scala의 Future와 Promise, Task, Closure의 core.async)이 있는 한, 우리는 1:1 멀티스레딩 플랫폼 위에 M:N 플랫폼도 구현할 수 있기 때문입니다. 하지만 M:N 런타임에서 M:N 멀티스레딩에 문제가 발생했을 때는, 플랫폼을 바꾸지 않고서는 문제를 해결할 수 없습니다. 그리고 실제로 대부분의 M:N 플랫폼은 자주 문제가 생깁니다.
모든 가능한 해결법을 배우고 결정을 내리는 것은 실로 엄청나게 고통스러운 작업이지만, 그래도 충분히 알지 못하고 결정을 내리는 것보다는 낫습니다. 이 문제에 한해서는 TOOWTDI 혹은 “나쁜 기능이 더 낫다”를 신념으로 갖지 않는 것이 좋습니다. Scala나 Haskell과 같은 표현적인(expressive) 신생 언어의 학습 난이도가 너무 높다는 불평은 문제의 요점과 떨어져 있습니다. 새로운 언어를 배우는 것은 병행성(concurreny)을 다루는 데에 있어서 가장 사소한 문제이기 때문입니다. 저는 심지어 병행성의 문제를 해결하지 못해 소프트웨어 업계를 떠난 사람들도 알고 있습니다.
예를 들어서, 두 개의 비동기 작업을 시작하고 그 결과들을 합친다고 합시다.
먼저 작업을 비동기적으로 실행하는 함수를 정의합니다.
import scala.concurrent.ExecutionContext.global type Async[A] = (A => Unit) => Unit def timesTwo(n: Int): Async[Int] = onFinish => { global.execute(new Runnable { def run(): Unit = { val result = n * 2 onFinish(result) } }) } // 사용법 timesTwo(20) { result => println(s"결과: $result") } //=> 결과: 40
하나의 실행이 끝나면 다음을 연속해서 실행하는 방법으로 다음과 같이 두 개의 비동기 결괏값을 합칠 수 있습니다.
def timesFour(n: Int): Async[Int] = onFinish => { timesTwo(n) { a => timesTwo(n) { b => // 두 개의 결과를 합친다 onFinish(a + b) } } } // 사용법 timesFour(20) { result => println(s"결과: $result") } //=> 결과: 80
두 개의 결괏값을 합치는 정도는 간단합니다.
하지만 비동기성은 관련된 모든 것들에 영향을 미친다는 문제가 있습니다. 여러분이 다음과 같은 순수 함수를 만들었다고 합시다.
def timesFour(n: Int): Int = n * 4
하지만 EJB를 사용하려는 EA가 여러분에게 순수 함수가 아닌 비동기 timesTwo
함수를 사용하라고 지시합니다. 이제 여러분의 timesFour
구현은 순수한 수학적 함수에서 side-effect가 넘쳐나는 함수가 될 수밖에 없습니다. 충분히 좋은 Async
형이 없다면 우리는 side-effect가 넘쳐나는 콜백으로 이루어진 작업 흐름을 다뤄야 합니다. 물론 결괏값이 나올 때까지 봉쇄(block)하는 것도 도움이 될 수 없습니다. 그저 문제를 숨기는 것일 뿐이니까요. 2장을 보면 이유를 알 수 있습니다.
게다가 문제는 점점 더 복잡해집니다. 😷
위의 '연속 실행'에서의 두 번째 실행은 첫 번째 실행에 의존하지 않습니다. 즉, 두 실행은 병렬적(parallel)으로 처리될 수 있습니다. JVM에서는 CPU-bound 작업3)들을 병렬적으로 실행할 수 있고, JavaScript에서도 Ajax 요청을 보내거나 Web Worker API를 사용할 때 병렬 처리가 가능합니다.
안타깝게도 병렬 실행에서는 일이 조금 복잡해집니다. 아래와 같은 순진한 접근 방법은 완전히 틀렸습니다.
// 영 좋지 못한 예시 def timesFourInParallel(n: Int): Async[Int] = onFinish => { var cacheA = 0 timesTwo(n) { a => cacheA = a } timesTwo(n) { b => // 두 개의 결과를 합친다 onFinish(cacheA + b) } } timesFourInParallel(20) { result => println(s"결과: $result") } //=> 결과: 80 timesFourInParallel(20) { result => println(s"결과: $result") } //=> 결과: 40
바로 이것이 비결정론의 실례(實例)입니다. 두 개의 작업이 종료되는 순서가 보장되지 않기 때문에 우리는 동기화를 수행하는 소규모의 상태 기계(state machine)를 구현해야 합니다.
먼저 상태 기계의 ADT를 정의합니다.
// 상태 기계 정의 sealed trait State // 초기 상태 case object Start extends State // B가 종료되어 A를 대기하는 상태 final case class WaitForA(b: Int) extends State // A가 종료되어 B를 대기하는 상태 final case class WaitForB(a: Int) extends State
다음으로, 상태 기계를 비동기적으로 구현합니다.
// JVM에서는 영 좋지 못한 예시 (JavaScript에서는 돌아감) def timesFourInParallel(n: Int): Async[Int] = { onFinish => { var state: State = Start timesTwo(n) { a => state match { case Start => state = WaitForB(a) case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } timesTwo(n) { b => state match { case Start => state = WaitForA(b) case WaitForB(a) => onFinish(a + b) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } } }
위의 해결법을 시각화하면 아래와 같은 상태 기계로 나타낼 수 있습니다.
하지만 아직 문제가 남아있습니다. JVM은 1:1 멀티스레딩 플랫폼이기 때문에 공유 메모리에 병행적으로 접근(shared memory concurrency)하게 됩니다. 따라서 state
에 대한 접근이 동기화되어야 합니다.
사용할 수 있는 한 가지 해결 방법은 synchronized
블록을 사용하는 것입니다. 이 방법은 '암묵적인 락(intrinsic lock)'이라고도 부릅니다.
// 모니터처럼 작동할 수 있는 공통 참조 생성 val lock = new AnyRef var state: State = Start timesTwo(n) { a => lock.synchronized { state match { case Start => state = WaitForB(a) case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } } } //...
이러한 고수준의 락(high-level lock)은 여러 개의 스레드가 병렬적(parallel)으로 (위의 state
와 같은) 리소스에 접근하지 못하도록 보호합니다. 하지만 저는 개인적으로 고수준 락의 사용을 피합니다. 커널의 스케쥴러는 임의의 스레드를 임의의 이유로 정지시킬 수 있는데, 락을 갖고 있는 스레드가 커널에 의해 정지되면 다른 모든 스레드가 정지되기 때문입니다. 정지를 피하고 끊임 없이 작업을 실행하기 위해서는 비봉쇄 알고리즘(non-blocking logic)이 더 낫습니다.
그래서 이 경우에는 AtomicReference를 사용하는 것이 정답입니다.
// JVM에서 잘 돌아가는 예시 import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference def timesFourInParallel(n: Int): Async[Int] = { onFinish => { val state = new AtomicReference[State](Start) @tailrec def onValueA(a: Int): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForB(a))) onValueA(a) // 재시도 case WaitForA(b) => onFinish(a + b) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } timesTwo(n)(onValueA) @tailrec def onValueB(b: Int): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForA(b))) onValueB(b) // 재시도 case WaitForB(a) => onFinish(a + b) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } timesTwo(n)(onValueB) } }
이제 조금 어려워졌나요? 조금 더 힘내봅시다.
위의 onFinish
의 호출이 스택-안전하지 않고(stack-unsafe), 호출 시점에서 비동기 범위(asynchronous boundary)를 강제하지 않으면 프로그램이 StackOverflowError
로 터질 수 있다고 한다면, 어떤 생각이 드시나요? 4)
말 그대로의 의미입니다. 앞서 작성한 코드를 제네릭한 방법(generic way)으로 다시 작성해 봅시다.
import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicReference type Async[+A] = (A => Unit) => Unit def mapBoth[A,B,R](fa: Async[A], fb: Async[B])(f: (A,B) => R): Async[R] = { // 상태 기계 정의 sealed trait State[+A,+B] // 초기 상태 case object Start extends State[Nothing, Nothing] // B가 종료되어 A를 대기하는 상태 final case class WaitForA[+B](b: B) extends State[Nothing,B] // A가 종료되어 B를 대기하는 상태 final case class WaitForB[+A](a: A) extends State[A,Nothing] onFinish => { val state = new AtomicReference[State[A,B]](Start) @tailrec def onValueA(a: A): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForB(a))) onValueA(a) // 재시도 case WaitForA(b) => onFinish(f(a,b)) case WaitForB(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } @tailrec def onValueB(b: B): Unit = state.get match { case Start => if (!state.compareAndSet(Start, WaitForA(b))) onValueB(b) // retry case WaitForB(a) => onFinish(f(a,b)) case WaitForA(_) => // async 작업이므로 catch 되지는 않지만, 예외 처리는 되도록 throw new IllegalStateException(state.toString) } fa(onValueA) fb(onValueB) } }
그리고 Scala의 Future.sequence
와 유사한 기능을 구현합시다. 우리의 의지는 강인하고 용기는 무한하니까요. 😇
def sequence[A](list: List[Async[A]]): Async[List[A]] = { @tailrec def loop(list: List[Async[A]], acc: Async[List[A]]): Async[List[A]] = list match { case Nil => onFinish => acc(r => onFinish(r.reverse)) case x :: xs => val update = mapBoth(x, acc)(_ :: _) loop(xs, update) } val empty: Async[List[A]] = _(Nil) loop(list, empty) } // 호출 sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))) { r => println(s"결과: $r") } //=> 결과: List(20, 40, 60)
이것으로 끝났을까요? 아닙니다.
val list = 0.until(10000).map(timesTwo).toList sequence(list)(r => println(s"합: ${r.sum}"))
자, 위의 코드를 실행시키고 영광스러운 메모리 오류를 목도하십시오. 프로덕션 환경이었다면 아래의 오류는 Scala의 NonFatal
로 처리되지도 않아서 프로그램이 크래시를 일으켰을 겁니다.
java.lang.StackOverflowError at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:2414) at java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2630) at scala.concurrent.impl.ExecutionContextImpl$$anon$3.execute(ExecutionContextImpl.scala:131) at scala.concurrent.impl.ExecutionContextImpl.execute(ExecutionContextImpl.scala:20) at .$anonfun$timesTwo$1(<pastie>:27) at .$anonfun$timesTwo$1$adapted(<pastie>:26) at .$anonfun$mapBoth$1(<pastie>:66) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67) at .$anonfun$mapBoth$1$adapted(<pastie>:40) at .$anonfun$mapBoth$1(<pastie>:67)
앞서 말씀드린 것과 같이, 강제된 비동기 범위 없이 실행된 onFinish
는 스택 오버플로우를 유발할 수 있습니다. JavaScript에서는 setTimeout
을 사용한 스케쥴링으로 문제를 해결할 수 있으며, JVM에서는 스레드 풀을 사용하거나 Scala의 ExecutionContext
를 사용할 수 있습니다. 5)
scala.concurrent.Future
는 엄격하게 계산되는 비동기 연산을 기술하는 방법입니다. 지금까지 위에서 사용한 Async
형과도 유사합니다.
Futures and promises, 영문 위키피디아: Future와 Promise는 일부 병행 프로그래밍 언어(concurrent programming language)에서 프로그램의 실행을 동기화하기 위해서 사용하는 생성자이다. Future와 Promise가 기술하는 객체는 연산이 아직 완료되지 않아 초기에는 계산할 수 없는 값에 대한 대리자(proxy)이다.
원주: 필자의 불평입니다.
Future와 Promise에 관한docs.scala-lang.org
의 문서의 다음 설명은 오해를 불러일으키기 쉽고 오개념의 원인이 됩니다.
“Future는 여러 동작을 병렬적(paraell)으로 수행하는 방법을 제공합니다. 이러한 방법은 효율적이며 비봉쇄적(non-blocking)입니다.”
Future
형은 비동기(asynchrony)를 기술하는 것이지, 병렬성(parallelism)을 기술하는 것이 아닙니다. 물론Future
를 사용해서 병렬처리를 하는 것도 가능하지만, 그렇다고 그것이 병렬처리만을 할 수 있다는 뜻은 아닙니다. 게다가 CPU의 성능을 최대로 끌어올릴 수 있는 방법을 찾는 사람들에게 있어서Future
는 비용이 크고 현명하지 못한 선택입니다. (4.4절 참고)
Future
는 다음과 같이 두 개의 주요 연산(primary operation)과, 주요 연산을 기반으로 정의되는 여러 함수 콤비네이터들로 정의되는 인터페이스입니다.
import scala.util.Try import scala.concurrent.ExecutionContext trait Future[+T] { // abstract def value: Option[Try[T]] // abstract def onComplete(f: Try[T] => Unit)(implicit ec: ExecutionContext): Unit // 값 변형 def map[U](f: T => U)(implicit ec: ExecutionContext): Future[U] = ??? // 연속 실행 ;-) def flatMap[U](f: T => Future[U])(implicit ec: ExecutionContext): Future[U] = ??? // ... }
Future
의 특성은 다음과 같습니다.
value
속성은 결괏값을 메모아이즈하기 위해 존재합니다. 아직 연산이 완료되지 않은 경우에는 None
이 할당됩니다. 당연한 이야기지만, value
속성의 def value
를 호출하면 비결정론적인 결과를 얻습니다.
다음은 ExecutionContext
에 대한 부연설명입니다.
ExecutionContext
는 비동기 실행을 관리합니다. 스레드 풀과 유사하다고 할 수 있겠지만, 반드시 스레드 풀의 역할만을 하는 것은 아닙니다. (비동기는 멀티스레딩이나 병렬성과 다르니까요)onComplete
는 기본적으로 우리가 위에서 정의한 Async
형과 유사하지만, ExecutionContext
를 인수로 받습니다. 모든 완료 콜백은 비동기적으로 호출되어야 하기 때문입니다.onComplete
를 기반으로 구현되었기 때문에 ExecutionContext
를 인수로 받습니다.
왜 모든 시그니처에 ExecutionContext
를 붙여야 하는지 아직 이해하지 못하겠다면 3.3절을 다시 읽어보세요.
3장에서 만든 함수를 Future
로 다시 정의해 봅시다.
import scala.concurrent.{Future, ExecutionContext} def timesTwo(n: Int)(implicit ec: ExecutionContext): Future[Int] = Future(n * 2) // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global timesTwo(20).onComplete { result => println(s"결과: $result") } //=> 결과: Success(40) }
아주 쉽습니다. Future.apply
메서드는 주어진 ExecutionContext
에서 주어진 연산을 수행합니다. JVM에서 global
컨텍스트를 선택했으므로, 연산은 별개의 스레드에서 실행됩니다.
3.1절의 연속 실행은 다음과 같이 구현할 수 있습니다.
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global timesFour(20).onComplete { result => println(s"결과: $result") } //=> 결과: Success(80) }
역시 아주 쉬워졌습니다. 위의 for-컴프리헨션은 아래와 같이 flatMap
과 map
으로 풀어서 쓸 수도 있습니다.
def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = timesTwo(n).flatMap { a => timesTwo(n).map { b => a + b } }
프로젝트에서 scala-async를 사용하면 다음과 같이 할 수도 있습니다.
import scala.async.Async.{async, await} def timesFour(n: Int)(implicit ec: ExecutionContext): Future[Int] = async { val a = await(timesTwo(a)) val b = await(timesTwo(b)) a + b }
scala-async
라이브러리는 매크로를 이용해서 여러분의 코드를 flatMap
과 map
을 이용하는 코드로 변환합니다. 다시 말해서, await
는 스레드를 봉쇄(block)하지 않습니다. 그럴 것 같은 인상을 주더라도 말이죠.
scala-async
는 정말로 훌륭해 보이지만, 안타깝게도 여러가지 한계가 있습니다. 예를 들어서 익명 함수 안에 있는 await
는 변환되지 않습니다. 아쉽지만 Scala 코드는 보통 그런 표현식으로 넘쳐납니다. 다음과 같은 코드도 작동하지 않습니다.
// 영 좋지 못한 예시 def sum(list: List[Future[Int]])(implicit ec; ExecutionContext): Future[Int] = async { var sum = 0 // for는 foreach로 변환되기 때문에 작동하지 않습니다 for (f <- list) { sum += await(f) } }
scala-async
의 접근법은 마치 1급 지속을 구현한 것과 같은 착각을 일으킵니다. 하지만 그러한 지속은 안타깝게도 1급이 아니고, 그저 컴파일러에 의해서 관리되는 코드 재작성일 뿐입니다. 그리고 이러한 한계는 C#과 ECMAScript에도 똑같이 적용됩니다. 그리고 이것은 안타깝게도 함수형 프로그래밍에서 async
키워드가 그다지 많이 쓰일 수 없음을 의미합니다.
'은 총알은 없다'는 저의 불평이 다시 떠오른다면 좋겠습니다. 😞
3.2절에서와 마찬가지로 두 번의 함수 호출은 서로에 대해 독립적이기 때문에 병렬적으로 실행할 수 있습니다. 초보자에게는 계산 구문(evaluation semantics)이 조금 난해할 수 있지만, Future
를 사용하면 이 작업은 더 쉬워집니다.
def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = { // Future는 조급하게 계산되므로, 둘 모두 값을 합치기 전에 실행됩니다 val fa = timesTwo(n) val fb = timesTwo(n) for (a <- fa; b <- fb) yield a + b // fa.flatMap(a => fb.map(b => a + b)) }
이러한 방법은 초보자에게는 조금 헷갈릴 수 있습니다. 작업이 병렬적(parallel)으로 실행되도록 하기 위해서는 간단히 결괏값을 합치기 이전에 Future
를 생성하면 됩니다.
다른 방법으로, 임의의 컬렉션에 대해 Future.sequence
를 사용할 수도 있습니다.
def timesFourInParallel(n: Int)(implicit ec: ExecutionContext): Future[Int] = Future.sequence(timesTwo(n) :: timesTwo(n) :: Nil).map(_.sum)
이 방법도 초보자에게는 조금 충격적일 수도 있습니다. 여기서 Future
들은 그 컬렉션이 엄격(strict)하게 정의되었을 때에만 병렬적(parallel)으로 실행되기 때문입니다. 이때 '엄격하게'라는 말은 Scala의 Stream
이나 Iterator
등으로 정의되지 않았음을 의미합니다. 이러한 표현이 초보자에게는 불명확한 탓도 있을 듯합니다.
Future
형은 재귀 동작에 대해서 완전히 안전합니다. (콜백 호출을 ExecutionContext
에 의존하고 있기 때문입니다.) 3.3절의 예시를 다시 작성해 봅시다.
def mapBoth[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R) (implicit ec: ExecutionContext): Future[R] = { for (a <- fa; b <- fb) yield f(a,b) } def sequence[A](list: List[Future[A]]) (implicit ec: ExecutionContext): Future[List[A]] = { val seed = Future.successful(List.empty[A]) list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l) .map(_.reverse) } // 사용법 { import scala.concurrent.ExecutionContext.Implicits.global sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println) // => List(20, 40, 60) }
이번에는 StackOverflowError
가 발생하지 않습니다.
val list = 0.until(10000).map(timesTwo).toList sequence(list).foreach(r => println(s"합: ${r.sum}")) //=> 합: 99990000
Future
의 문제점은 onComplete
를 호출할 때마다 ExecutionContext
가 사용된다는 점입니다. 이러한 동작은 일반적으로 Runnable
이 스레드 풀로 전달하는 것이고, 논리 스레드를 포크(fork)하게 됩니다. CPU-bound 작업이 많다면 Future
의 구현은 성능에 재앙과도 같습니다. 스레드를 이동하는 동작은 문맥 교환(context switch)을 발생시키고 CPU 캐시의 참조 국소성(CPU cache locality)을 파괴하기 때문입니다. 물론 Future
의 구현은 어느정도의 최적화를 수행하긴 합니다. 내부 콜백으로 인해서 새 스레드가 포크되지 않도록 flatMap
은 내부 실행 맥락(internal execution context)을 사용합니다. 하지만 그러한 최적화로는 부족하고, 벤치마크 결과는 거짓말을 하지 않습니다.
또 메모아이제이션이 이루어진다는 것은, 작업이 완료되었을 때 생산자(producer)와 소비자(listener)에서 각각 한 번 씩 AtomicReference.compareAndSet
이 호출된다는 의미입니다. 메모아이제이션이 멀티스레드 환경에서 잘 작동하도록 하는 이 호출들은 꽤 많은 비용이 듭니다.
다시 말해서, 여러분이 CPU의 성능을 최대로 끌어올려서 CPU-bound 작업을 수행하려고 한다면 Future
와 Promise
를 사용하는 것은 그다지 좋은 생각이 아닙니다.
Scala의 Future
와 Task
구현의 성능 비교는, 다음의 최근 벤치마크 결과로 볼 수 있습니다.
[info] Benchmark (size) Mode Cnt Score Error Units [info] FlatMap.fs2Apply 10000 thrpt 20 291.459 ± 6.321 ops/s [info] FlatMap.fs2Delay 10000 thrpt 20 2606.864 ± 26.442 ops/s [info] FlatMap.fs2Now 10000 thrpt 20 3867.300 ± 541.241 ops/s [info] FlatMap.futureApply 10000 thrpt 20 212.691 ± 9.508 ops/s [info] FlatMap.futureSuccessful 10000 thrpt 20 418.736 ± 29.121 ops/s [info] FlatMap.futureTrampolineEc 10000 thrpt 20 423.647 ± 8.543 ops/s [info] FlatMap.monixApply 10000 thrpt 20 399.916 ± 15.858 ops/s [info] FlatMap.monixDelay 10000 thrpt 20 4994.156 ± 40.014 ops/s [info] FlatMap.monixNow 10000 thrpt 20 6253.182 ± 53.388 ops/s [info] FlatMap.scalazApply 10000 thrpt 20 188.387 ± 2.989 ops/s [info] FlatMap.scalazDelay 10000 thrpt 20 1794.680 ± 24.173 ops/s [info] FlatMap.scalazNow 10000 thrpt 20 2041.300 ± 128.729 ops/s
CPU-bound 작업은 Monix Task가 Scala의 Future
에 비해 압도적인 성능을 보여주고 있습니다.
원주: 이 벤치마크 결과는 한정적입니다. 여전히Future
가 더 빠른 경우도 있고 (Monix의 Observer는 몇 가지 합당한 이유로 인해Future
를 사용하고 있습니다), 애초에 CPU-bound 작업의 성능이 크게 중요하지 않은 경우도 자주 있습니다. 예를 들어서 I/O 작업을 할 때는 처리량과 CPU 속도가 별 상관이 없습니다.
Task
is a data type for controlling possibly lazy & asynchronous computations, useful for controlling side-effects, avoiding nondeterminism and callback-hell.
The Monix library provides a very sophisticated Task implementation, inspired by the Task in Scalaz. Same concept, different implementation.
TheTask
type is also inspired by Haskell's IO monad, being in this author's opinion the true IO type for Scala.
This is a matter of debate, as Scalaz also exposes a separate IO type that only deals with synchronous execution. The Scalaz IO is not async, which means that it doesn't tell the whole story, because on top of the JVM you need to represent async computations somehow. In Haskell on the other hand you have the Async type which is converted to IO, possibly managed by the runtime (green-threads and all).
On the JVM, with the Scalaz implementation, we can't represent async computations with IO and without blocking threads on evaluation, which is something to avoid, because blocking threads is error prone.
In summary the Task
type:
Future
it doesn’t trigger the execution, or any effects until runAsync
Specific to the Monix implementation:
Task
sits in the design space:
Redefining our function from section 3 in terms of Task
:
import monix.eval.Task def timesTwo(n: Int): Task[Int] = Task(n * 2) // Usage { // Our ExecutionContext needed on evaluation import monix.execution.Scheduler.Implicits.global timesTwo(20).foreach { result => println(s"Result: $result") } //=> Result: 40 }
The code seems to be almost the same as the Future
version in section 4.1, the only difference is that our timesTwo
function no longer takes an ExecutionContext
as a parameter. This is because Task
references are lazy, being like functions, so nothing gets printed until the call to foreach
which forces the evaluation to happen. It is there that we need a Scheduler, which is Monix's enhanced ExecutionContext
.
Now to do sequencing like in section 3.1:
def timesFour(n: Int): Task[Int] = for (a <- timesTwo(n); b <- timesTwo(n)) yield a + b // Usage { // Our ExecutionContext needed on evaluation import monix.execution.Scheduler.Implicits.global timesFour(20).foreach { result => println(s"Result: $result") } //=> Result: 80 }
And just like with the Future
type, that “for comprehension” magic is translated by the Scala compiler to nothing more than calls to flatMap
and map
, literally equivalent with:
def timesFour(n: Int): Task[Int] = timesTwo(n).flatMap { a => timesTwo(n).map { b => a + b } }
The story for Task
and parallelism is better than with Future
, because Task
allows fine-grained control when forking tasks, while trying to execute transformations (e.g. map
, flatMap
) on the current thread and call-stack, thus preserving cache locality and avoiding context switches for what is in essence sequential work.
But first, translating the sample using Future
does not work:
// BAD SAMPLE (for achieving parallelism, as this will be sequential) def timesFour(n: Int): Task[Int] = { // Will not trigger execution b/c Task is lazy val fa = timesTwo(n) val fb = timesTwo(n) // Evaluation will be sequential b/c of laziness for (a <- fa; b <- fb) yield a + b }
In order to achieve parallelism ''Task'' requires you to be explicit about it:
def timesFour(n: Int): Task[Int] = Task.mapBoth(timesTwo(n), timesTwo(n))(_ + _)
Oh, does mapBoth
seem familiar? If those two tasks fork threads on execution, then they will get executed in parallel as mapBoth
starts them both at the same time.
Task is recursive and stack-safe (in flatMap
) and incredibly efficient, being powered by an internal trampoline. You can checkout this cool paper by Rúnar Bjarnason on Stackless Scala with Free Monads for getting a hint on how Task got implemented so efficiently.
The sequence
implementation looks similar with the one for Future
in section 4.3, except that you can see the laziness in the signature of sequence
:
def sequence[A](list: List[Task[A]]): Task[List[A]] = { val seed = Task.now(List.empty[A]) list.foldLeft(seed)((acc,f) => for (l <- acc; a <- f) yield a :: l) .map(_.reverse) } // Invocation { // Our ExecutionContext needed on evaluation import monix.execution.Scheduler.Implicits.global sequence(List(timesTwo(10), timesTwo(20), timesTwo(30))).foreach(println) // => List(20, 40, 60) }
When working with well grown functions such as map
, flatMap
and mapBoth
, we no longer care that underlying it all is an "(A => Unit) => Unit"
, because these functions are, assuming lawfulness, pure and referentially transparent. This means we can reason about them and their result, divorced from their surrounding context.
This is the great achievement of Haskell's IO
. Haskell does not “fake” side-effects, as functions returning IO
values are literally pure, the side-effects being pushed at the edges of the program where they belong. And we can say the same thing about Task
. Well, for Future
it's more complicated given its eager nature, but working with Future
is not bad either.
And can we build interfaces that abstract over such types as Task
, Future
, Coeval
, Eval
, IO
, Id
, Observable
and others?
Yes we can, we've already seen that flatMap
describes sequencing, while mapBoth
describes parallelism. But we can't describe them with classic OOP interfaces, for one because due to the covariance and contravariance rules of Function1
parameters we'd lose type info in flatMap
(unless you use F-bounded polymorphic types, which are more suitable for implementation reuse and aren't available in other OOP languages), but also because we need to describe a data constructor that can't be a method (i.e. OOP subtyping applies to instances and not whole classes).
Fortunately Scala is one of the very few languages capable of higher kinded types and with the ability to encode type-classes, which means we've got everything needed to port concepts from Haskell 😄
Author's Rant: The dreadedMonad
,Applicative
andFunctor
words strike fear in the hearts of the unfaithful, having given rise to the belief that they are “academic” notions disconnected from real-world concerns, with book authors going to great length to avoid using these words, which includes Scala's API documentation and official tutorials.
But this is a disservice to both the Scala language and its users. In other languages they are only design patterns that are hard to explain primarily because they can't be expressed as types. You can count the languages having this expressive capability with one hand. And users suffer because in case of trouble they don't know how to search for existing literature on the subject, having been deprived of learning the correct jargon.
I also feel this is a flavor of anti-intellectualism, as usual born out of fear of the unknown. You can see it coming from people that really know what they are doing, as none of us is immune. For example Java's Optional type violates the functor laws (e.g.opt.map(f).map(g) != opt.map(f andThen g))
, inSwift 5 == Some(5)
which is preposterous and good luck explaining to people thatSome(null)
actually makes sense for as long as null is a valid value of AnyRef and because otherwise you can't defineApplicative[Option]
.
This article is not about explaining Monads. There are other great articles for that. But if you're looking to build an intuition, here's another one: in the context of data types such as Future
or Task
, Monads describe sequencing of operations and is the only reliable way to ensure ordering.
Aleksey Shipilëv: “Observation: programmers doing concurrency with imperative languages are tripped by the unchallenged belief that ”;“ defines sequencing.”
A simple encoding of the Monad
type in Scala:
// We shouldn't need to do this :-( import scala.language.higherKinds trait Monad[F[_]] { /** Constructor (said to lift a value `A` in the `F[A]` * monadic context). Also part of `Applicative`, see below. */ def pure[A](a: A): F[A] /** FTW */ def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B] }
And providing an implementation for Future
:
import scala.concurrent._ // Supplying an instance for Future isn't clean, ExecutionContext needed class FutureMonad(implicit ec: ExecutionContext) extends Monad[Future] { def pure[A](a: A): Future[A] = Future.successful(a) def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) } object FutureMonad { implicit def instance(implicit ec: ExecutionContext): FutureMonad = new FutureMonad }
This is really powerful stuff. We can now describe a generic function that works with Task
, Future
, IO
, whatever, although it would be great if the flatMap
operation is stack-safe:
/** Calculates the N-th number in a Fibonacci series. */ def fib[F[_]](n: Int)(implicit F: Monad[F]): F[BigInt] = { def loop(n: Int, a: BigInt, b: BigInt): F[BigInt] = F.flatMap(F.pure(n)) { n => if (n <= 1) F.pure(b) else loop(n - 1, b, a + b) } loop(n, BigInt(0), BigInt(1)) } // Usage: { // Needed in scope import FutureMonad.instance import scala.concurrent.ExecutionContext.Implicits.global // Invocation fib[Future](40).foreach(r => println(s"Result: $r")) //=> Result: 102334155 }
PRO-TIP: this is just a toy example. For getting serious, see Typelevel's Cats
Monads define sequencing of operations, but sometimes we want to compose the results of computations that are independent of each other, that can be evaluated at the same time, possibly in parallel. There's also a case to be made that applicatives are more composable than monads 😏
Let's expand our mini Typeclassopedia to put on your wall:
trait Functor[F[_]] { /** I hope we are all familiar with this one. */ def map[A,B](fa: F[A])(f: A => B): F[B] } trait Applicative[F[_]] extends Functor[F] { /** Constructor (lifts a value `A` in the `F[A]` applicative context). */ def pure[A](a: A): F[A] /** Maps over two references at the same time. * * In other implementations the applicative operation is `ap`, * but `map2` is easier to understand. */ def map2[A,B,R](fa: F[A], fb: F[B])(f: (A,B) => R): F[R] } trait Monad[F[_]] extends Applicative[F] { def flatMap[A,B](fa: F[A])(f: A => F[B]): F[B] }
And to expand our Future
implementation:
// Supplying an instance for Future isn't clean, ExecutionContext needed class FutureMonad(implicit ec: ExecutionContext) extends Monad[Future] { def pure[A](a: A): Future[A] = Future.successful(a) def flatMap[A,B](fa: Future[A])(f: A => Future[B]): Future[B] = fa.flatMap(f) def map2[A,B,R](fa: Future[A], fb: Future[B])(f: (A,B) => R): Future[R] = // For Future there's no point in supplying an implementation that's // not based on flatMap, but that's not the case for Task ;-) for (a <- fa; b <- fb) yield f(a,b) } object FutureMonad { implicit def instance(implicit ec: ExecutionContext): FutureMonad = new FutureMonad }
So we can now define generic functions based on Applicative
which is going to work for Future
, Task
, etc:
def sequence[F[_], A](list: List[F[A]]) (implicit F: Applicative[F]): F[List[A]] = { val seed = F.pure(List.empty[A]) val r = list.foldLeft(seed)((acc,e) => F.map2(acc,e)((l,a) => a :: l)) F.map(r)(_.reverse) }
PRO-TIP: worth repeating, this is just a toy example. For getting serious, see Typelevel's Cats
Missing from above is a way to actually trigger an evaluation and get a value out. Thinking of Scala's Future
, we want a way to abstract over onComplete
. Thinking of Monix's Task we want to abstract over runAsync
. Thinking of Haskell's and Scalaz's IO, we want a way to abstract over unsafePerformIO
.
The FS2 library has defined a type-class called Effect that goes like this (simplified):
trait Effect[F[_]] extends Monad[F] { def unsafeRunAsync[A](fa: F[A])(cb: Try[A] => Unit): Unit }
This looks like our initial Async
type, very much similar with Future.onComplete
, with Task.runAsync
and could be applied to IO.unsafePerformIO
.
However, this is not a real type-class because:
Show
exist), the bigger problem is …StackOverflowError
, we need some sort of execution context or thread-pool that can execute tasks asynchronously without blowing up the stack
And such an execution context is different from implementation to implementation. Java will use Executor
, the Scala Future uses ExecutionContext
, Monix uses Scheduler
which is an enhanced ExecutionContext
, FS2 and Scalaz use Strategy
which wraps an Executor for forking threads and don't inject a context when their unsafePerformIO
or runAsync
gets called (which is why many of the Scalaz combinators are in fact unsafe), etc.
We could apply the same strategy as with Future
, to build the type-class instance by taking a implicit whatever: Context
from the scope. But that's a little awkward and inefficient. It's also telling that we can't define flatMap
only in terms of Effect.unsafePerformIO
, not without that execution context. And if we can't do it, then the type should probably not inherit from Monad
because it's not necessarily a Monad
.
So I'm personally not sure - if you have suggestions for what should be introduced in Cats, I'd love to hear them.
I do hope you enjoyed this thought experiment, designing things is fun 😎
Some abstractions are more general purpose than others and personally I think the mantra of “picking the right tool for the job” is overused to defend poor choices.
That said, there's this wonderful presentation by Rúnar Bjarnason called Constraints Liberate, Liberties Constrain that really drives the point home with concurrency abstractions at least.
As said, there is no silver bullet that can be generally applied for dealing with concurrency. The more high-level the abstraction, the less scope it has in solving issues. But the less scope and power it has, the simpler and more composable the model is. For example many developers in the Scala community are overusing Akka Actors - which is a great library, but not when misapplied. Like don't use an Akka Actor
when a Future
or a Task
would do. Ditto for other abstractions, like the Observable
pattern in Monix and ReactiveX.
Also learn by heart these 2 very simple rules:
avoid dealing with callbacks, threads and locks, because they are very error prone and not composable at all avoid concurrency like the plague it is And let me tell you, concurrency experts are first of all experts in avoiding concurrency 💀
setTimeout
을 사용한 스케줄링에 관해서는 역자가 제6회 D2 CAMPUS SEMINAR에서 자세하게 발표했었습니다. 관련 자료는 D2 공식 사이트에 등록되어 있습니다.