2016년 12월 13일 화요일

GenServer.call은 응답 메시지는 어떻게 구분할까.

Erlang/Elixir 를 처음 공부할때 프로세스 통신에서 조금 신기했습니다. 다른 언어들은 동기 통신을 설명하고 비동기 통신을 설명하는데 Erlang/Elixir 는 비동기 통신부터 설명하고 동기 통신에 대해선 GenServer의 call 를 제외하곤 잘 다루지 않고 GenServer.call 또한 내부적으로 비동기 호출을 이용해서 구현되어 있다고 합니다.

그렇다면 GenSever의 call은 어떻게 구현되어 있을까요. 그리고 더 나아가서 프로세스들이 주고받는 메시지는 어떻게 구분할까요.

시나리오

"프로세스들이 주고받는 메시지는 어떻게 구분할까" 를 살펴보기 위해 시나리오를 그려봤습니다.
만약 한 액터가 다른 액터에게 동기 메시지를 보내고 응답 메시지를 기다리는 도중에 다른 액터에게서 새로운 요청 메시지를 받았다면 그 액터는 이 메시지가 응답 메시지인지 요청 메시지인지 어떻게 구분할까.
의 상황을 말하고 싶었는데 제가 글솜씨가 떨어져서 무리가 있네요.


1) 독립된 프로세스인 AcotrA, AcotrB, AcotrC 를 만듭니다.
2) ActorB 는 ActorC로 call함수로 동기로 메시지를 보냅니다. (call)
3) 보낸 메시지는 ActorC의 mail box에 쌓입니다.
1) ActorB가 ActorC를 GenServer.call를 이용하여 동기 호출
4) ActorC는 mail box에서 메시지를 뽑아 처리를 합니다.
5) ActorB는 ActorC의 응답을 기다립니다.
2) ActorC는 ActorB로 부터 받은 메시지를 처리중, ActorB는 block 상태

6) ActorB가 ActorC의 메시지를 기다리는 동안 ActorA가 ActorB에게 메시지를 보냅니다.
7) ActorB는 ActorA로 부터 온 메시지는 무시하고 ActorC에게 응답이 올때까지 대기합니다.
3) ActorC에 의해 block된 ActorB로 ActorA가 메시지를 보냄
8) ActorC가 ActorB에게 응답 메시지를 보냅니다.
9) ActorB는 응답 메시지를 받아 대기를 풀고 응답에 메시지에 대한 처리를 합니다.
4) ActorC가 처리를 끝내고 응답을 줌, ActorB가 응답을 받고 후처리를 함





코드

그럼 이제 코드를 보면서 GenServer가 어떻게 비동기 호출을 이용해서 동기 호출 call을 구현했고, 어떻게 메시지들을 구분하는지 알아보겠습니다.


%% -------------------------------------------
%% gen_server.erl
%% -------------------------------------------

call(Name, Request) ->
    case catch gen:call(Name, '$gen_call', Request) of
 {ok,Res} ->
     Res;
 {'EXIT',Reason} ->
     exit({Reason, {?MODULE, call, [Name, Request]}})
    end.


일단 GenServer모듈의 gen_server.erl 소스를 보면 굉장이 간단하게 작성되어 있습니다. 모듈화 자체가 잘되어 있어서 gen_server.erl내에서 구현된것보다 외부 모듈을 사용하는것이 많습니다.
 


case catch gen:call(Name, '$gen_call', Request) of

우리가 보고싶은 call 함수도 직접적인 구현은 gen 모듈에 구현되어있고 gen_server.erl에서는 호출만 하고 있습니다.


%% -------------------------------------------
%% gen.erl
%% -------------------------------------------

do_call(Process, Label, Request, Timeout) ->
    try erlang:monitor(process, Process) of
 Mref ->
     catch erlang:send(Process, {Label, {self(), Mref}, Request},
    [noconnect]),
     receive
  {Mref, Reply} ->
      erlang:demonitor(Mref, [flush]),
      {ok, Reply};
  {'DOWN', Mref, _, _, noconnection} ->
      Node = get_node(Process),
      exit({nodedown, Node});
  {'DOWN', Mref, _, _, Reason} ->
      exit(Reason)
     after Timeout ->
      erlang:demonitor(Mref, [flush]),
      exit(timeout)
     end
    catch
 error:_ ->
     Node = get_node(Process),
     monitor_node(Node, true),
     receive
  {nodedown, Node} -> 
      monitor_node(Node, false),
      exit({nodedown, Node})
     after 0 -> 
      Tag = make_ref(),
      Process ! {Label, {self(), Tag}, Request},
      wait_resp(Node, Tag, Timeout)
     end
    end.



우리가 궁금해하는 동작은 gen.erl 에 do_call 함수안에 있습니다.
뭔가 좀 긴듯한 느낌이 들지만 애러 핸들링 하는 부분을 빼고나면 중요한 로직은 세부분만 남습니다.

첫번째 코드는 erlang:monitor를 이용해서 메시지를 보낼 프로세스의 모니터를 생성하는 부분입니다.
   try erlang:monitor(process, Process) of

모니터는 call를 호출할때마다 새로 생성하고 이때 리턴받은 레퍼런스를 메시지를 구분하는 아이디 용도로 요청을 보낼때 그리고 응답을 받았을때 사용합니다.

두번째는 erlang:send를 이용해서 메시지를 보내는 코드입니다. 여기서 눈여겨볼 부분은 메시지 안에 메시지를 보내는 나의 pid(self() 함수)와 첫번째 코드에서 생성한  Mref(레퍼런스)를 메시지와 합쳐서 보내는 부분입니다.

   catch erlang:send(Process, {Label, {self(), Mref}, Request},

이렇게 {self(), Mref} 를 메시지와 같이 보내서 요청받은 프로세스가 완료된 응답 메시지를 누구에게 다시 응답줄지를 결정하게 됩니다. 요청받은 프로세스는 요청을 다 완료한 후 self() 로 넘겨준 pid 에게 {Mref, Reply} 라는 메시지를 응답으로 보내게 됩니다.

그리고 그 응답 메시지를 받는 부분이 바로 세번째 코드인 receive 입니다.

   receive
     {Mref, Reply} ->
       erlang:demonitor(Mref, [flush]),
       {ok, Reply};

receive 는 원하는 패턴이 올때까지 대기를 하면서 기다립니다. 그리고 이 패턴은 내가 요청을 보낼때 보냈던 메시지 아이디 대용으로 사용한 레퍼런스 Mref를 포함한 응답 메시지 입니다. 


그리고 정상적으로 응답이 왔다면 레퍼런스를 이용해서 해당 프로세스에대한 모니터를 해제하고 응답 메시지를 리턴합니다.

receive의 erlang 도큐먼트를 보면 다음과 같은 설명이 있습니다.
Receive

receive
    Pattern1 [when GuardSeq1] ->
        Body1;
    ...;
    PatternN [when GuardSeqN] ->
        BodyN
end 
Receives messages sent to the process using the send operator (!). The patterns Pattern are sequentially matched against the first message in time order in the mailbox, then the second, and so on. If a match succeeds and the optional guard sequence GuardSeq is true, the corresponding Body is evaluated. The matching message is consumed, that is, removed from the mailbox, while any other messages in the mailbox remain unchanged.

The return value of Body is the return value of the receive expression.

receive never fails. The execution is suspended, possibly indefinitely, until a message arrives that matches one of the patterns and with a true guard sequence.

위 설명에서 제일 중요한 부분은 다른 메시지는 변경하지 않고 원하는 패턴의 메시지가 왔을때만 사용한다는 부분입니다. 바로 이런 특성 덕분에 프로세스는 내가 받은 요청 메시지들과 응답으로 받은 응답 메시지를 구분할 수 있습니다.


결론

Erlang/Elixir를 처음 공부할때 쉽게 오해할 수 있는 부분이 메일 박스가 단순한 메시지 큐이고 receive는 순서대로 메시지를 빼온다는것 입니다. (책을 꼼꼼히 읽는 분들은 아니겠지만..) 하지만 실제로는 그렇게 작동하지 않죠 메일 박스는 큐처럼 쌓이긴 하지만 앞에서부터 소비되지 않고 receive는 패턴매칭을 이용해서 원하는 메시지만 메일 박스에서 소비됩니다. 이런 작은 특성들이 복잡하게 보일수도 있었던 기능들을 간단하게 구현 가능하게 했습니다. 그러면서 도큐먼트를 자세히 읽어야 겠다는 반성을 하게 되네요..


2016년 3월 18일 금요일

Elixir 에서 PG2 사용하기

Elixir에서 PG2


PG2?


pg2는 Erlang의 stdlib로 분산되어있는 노드들에 특정 프로세스들를 그룹핑 할 수 있는 라이브러리 입니다. 이 라이브러리를 사용하면 그룹명을 가지고 Cluster로 연결된 노드들의 프로세스를 직접 호출하여 사용할 수 있습니다.


This module implements process groups. Each message may be sent to one, some, or all members of the group.

A group of processes can be accessed by a common name. For example, if there is a group named foobar, there can be a set of processes (which can be located on different nodes) which are all members of the group foobar. There are no special functions for sending a message to the group. Instead, client functions should be written with the functions get_members/1 and get_local_members/1 to find out which processes are members of the group. Then the message can be sent to one or more members of the group.

If a member terminates, it is automatically removed from the group. (erlang documents)


PG2 주요 API



create(Name :: name()) -> ok


빈 그룹 하나를 생성합니다. 이 그룹은 Cluster로 연결된 모든 Node에서 보여집니다.



delete(Name :: name()) -> ok


그룹을 삭제합니다.



join(Name :: name(), Pid :: pid()) -> ok


Process pid를 그룹에 추가합니다. 중복 Join이 가능하여 주의하여야 합니다.



leave(Name, Pid :: pid()) -> ok | {error, {no_such_group, Name}}


Process pid를 그룹에서 삭제합니다. 그룹에 Pid가 없을시에도 ok를 리턴합니다. 그룹명이 없을때만 error를 리턴합니다.



get_members(Name) -> [pid()] | {error, {no_such_group, Name}}


그룹내에 존재하는 모든 Pid들을 리턴합니다.



get_closest_pid(Name) -> pid() | {error, Reason}


local 에 있는 Process pid를 리턴합니다. 만약 로컬 Process가 죽었거나 호출 불가능 하다면 랜덤하게 하나의 Process pid를 리턴합니다.



PG2 간단 사용


pg2는 Custer로 연결된 노드들의 프로세스를 그룹핑 하기 위한 라이브러리 이므로 iex를 이용해서 두개의 노드를 실행하고 연결시키겠습니다.


첫번째 node1 을 실행시키고


$ iex --sname node1

iex(node1@syntaxfish) >

두번쨰로 node2 를 실행시키고 Node.connect() 함수를 이용하여 node1번과 연결 시킨후 Node.list() 함수를 이용하여 연결된 노드 리스트를 확인합니다.



$ iex --sname node2

iex(node2@syntaxfish) > Node.connect(:node1@syntaxfish)
:ok
iex(node2@syntaxfish) > Node.list
[:node1@syntaxfish]

다음으로 node1번에서 :test라는 그룹을 만들고 현재 shell을 그룹에 :pg2.join()함수로 추가하겠습니다. 그리고 :pg2.get_memebers() 함수로 :test 그룹을 출력해보면 shell pid가 그룹에 추가된걸 확인할 수 있습니다.



iex(node1@syntaxfish) > :pg2.create(:test)
:ok
iex(node1@syntaxfish) > :pg2.get_members(:test)
[]
iex(node1@syntaxfish) > :pg2.join(:test, self)
:ok
iex(node1@syntaxfish) > :pg2.get_members(:test)
[#PID<0.63.0>]

이렇게 PG2를 이용하여 node1의 shell을 :test그룹에 추가하는것에 성공하였습니다. 그럼 정말 Cluster로 연결된 다른 노드에서 호출이 가능한지 테스트 해보겠습니다.


테스트 방법은 node2 에서 node1의 shell_pid로 “hello!!” 라는 메시지를 보내고, 다시 node1로 돌아서와서 메시지를 확인하는것으로 해보겠습니다.


아래처럼 node1에서 메시지를 전송하고



iex(node2@syntaxfish) > :pg2.get_members(:test)
[#PID<8387.63.0>]
iex(node2@syntaxfish) > [node1_shell] = :pg2.get_members(:test)
[#PID<8387.63.0>]
iex(node2@syntaxfish) > send(node1_shell, "hello!!")
"hello!!"

node1 에서 메시지를 비우면 node2에서 보낸 메시지가 표시되는걸 확인하실 수 있습니다.



iex(node1@syntaxfish) > flush()
"hello!!"


PG2 적용 예제


PG2를 사용하여서 간단한 Echo 예제를 만들어보겠습니다.

구조는 node1 번에 EchoServer 와 통신할 EchoClinet 프로세스를 만들고, node2와 node3에는 EchoServer 프로세스를 만든후 PG2를 이용해서 :echo_server 그룹에 추가합니다. 완성된 구조는 아래 그림과 같습니다.




EchoServer


mix 를 이용해서 echo_server를 생성합니다.



$ mix new echo_server --sup

그리고 GenServer 를 이용해서 요청을 받을 EchoServer.Server 를 작성합니다.



defmodule EchoServer.Server do
use GenServer

def start_link do
GenServer.start_link(__MODULE__, :ok, name: EchoServer.Server)
end

def init(:ok) do
:pg2.create(:echo_server) # 그룹 생성
:pg2.join(:echo_server, self) # 그룹 추가

{:ok, %{}}
end

## Public API
def echo(pid, message) do
GenServer.call(pid, {:echo, message})
end

## Callback API
def handle_call({:echo, message}, _from, state) do
{:reply, build_echo_message(message), state}
end

def build_echo_message(message) do
"[#{Node.self |> to_string}] #{message}"
end
end

handle_call 로 요청을 받으면 응답을 주는 로직은 기본 GenServer와 동일하지만, 다른점은 init() 함수 내에서 PG2를 이용해서 그룹을 만들고 그룹에 추가하는 코드 입니다. 이렇게 두줄만 추가하면 Cluster로 연결된 모든 노드에서 :echo_server 라는 그룹명으로 프로세스에 메시지를 보낼 수 있습니다.



    ...
:pg2.create(:echo_server) # 그룹 생성
:pg2.join(:echo_server, self) # 그룹에 추가
...

완성된 EchoServer.Server 모듈을 Application에 추가 시킵니다.



defmodule EchoServer do
use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
worker(EchoServer.Server, [])
]

opts = [strategy: :one_for_one, name: EchoServer.Supervisor]
Supervisor.start_link(children, opts)
end
end

그리고 작동하는지 확인을 위해 테스트코드를 간단하게 작성했습니다. 첫번째 테스트는 일반적인 GenServer의 호출방식으로 호출했고, 두번째 테스트는 PG2를 이용하여 호출한것입니다.



defmodule EchoServerTest do
use ExUnit.Case

alias EchoServer.Server

@message "hi!!"

test "process call by local name" do
echo = Server.echo(Server, @message)

assert echo == Server.build_echo_message(@message)
end


test "proces call by pg2" do
echo = :pg2.get_closest_pid(:echo_server)
|> GenServer.call({:echo, @message})

assert echo == Server.build_echo_message(@message)
end
end

테스트를 진행하면 직접 호출, pg2를 이용하여 호출 둘다 정상작동하는걸 확인하실 수 있습니다.



Finished in 0.04 seconds (0.04s on load, 0.00s on tests)
2 tests, 0 failures

Randomized with seed 285286


EchoClient


이제 EchoClient를 만들어보겠습니다. EchoServer와 동일하게 mix 를 이용하여 새로운 프로젝트를 만들고.



$ mix new echo_client --sup

GenServer를 이용하여 EchoClient를 작성합니다.



defmodule EchoClient.Client do
use GenServer

@init_state %{}

def start_link do
GenServer.start_link(__MODULE__, :ok, name: EchoClient.Client)
end

def init(:ok) do
{:ok, %{}}
end

## Public API
def send_echo(pid, message) do
GenServer.call(pid, {:send_echo, message})
end

## Callback API
def handle_call({:send_echo, message}, _from, state) do
case :pg2.get_closest_pid(:echo_server) do
pid when is_pid(pid) ->
echo = GenServer.call(pid, {:echo, message})
{:reply, echo, state}
{:error, _r} = error ->
{:reply, error, state}
end
end
end

이 코드도 기본적인 GenServer의 모습과 동일합니다. 다른점은 handle_call 함수안에서 PG2를 이용하여 서버 PID를 구해오는 부분입니다.



...
case :pg2.get_closest_pid(:echo_server) do
pid when is_pid(pid) ->
echo = GenServer.call(pid, {:echo, message})
{:reply, echo, state}
{:error, _r} = error ->
{:reply, error, state}
end
...

그리고 실행을 위해 EchoClient를 Application에 추가합니다.



defmodule EchoClient do
use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false

children = [
worker(EchoClient.Client, []),
]

opts = [strategy: :one_for_one, name: EchoClient.Supervisor]
Supervisor.start_link(children, opts)
end
end


실행


이제 코드는 다 완성했으니 작동하는지 직접 요청을 보내서 테스트 해보겠습니다.

일단 node1 에 EchoClient를 실행합니다.



$ iex --sname node1 -S mix

그리고 node2, node3으로 EchoServer를 실행시킵니다.



$ iex --sname node2 -S mix


$ iex --sname node3 -S mix

실행 시킨 node들을 Cluster로 연결해줍니다.



iex(node1@syntaxfish) > Node.connect :node2@syntaxfish
true
iex(node1@syntaxfish) > Node.connect :node3@syntaxfish
true
iex(node1@syntaxfish) > Node.list
[:node2@syntaxfish, :node3@syntaxfish]

모두 연결이 되었으니 node1(EchoClient) 에서 EchoServer로 메시지를 보내보겠습니다.



iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node3@syntaxfish] hello world!"
iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node3@syntaxfish] hello world!"
iex(node1@syntaxfish) > EchoClient.Client.send_echo(EchoClient.Client, "hello world!")
"[node2@syntaxfish] hello world!"

메시지가 정상적으로 node2와 node3에 요청되는걸 확인하실 수 있습니다.


전체 프로젝트는 Github에 올려놓았습니다. pg2_example


마치며…


첫 블로그 포스팅이라 코드를 치는것보다 한국말을 쓰는게 어렵네요.. 무튼 PG2는 Erlang와 Elixir에서 분산 시스템을 구축할때 자주 사용되는 모듈입니다. elixir의 대표적인 web framework인 phoenix에서도 PG2를 사용하고 있습니다.


그런데 조금 마음에 안드는 부분이 있습니다. PG2에서 원하는 그룹의 pid를 받아올때 get_closest_pid() 라는 함수를 사용하게 되는데 이 함수가 넘겨주는 방식이 그닥 마음에 들지 않습니다. 함수명은 가장 가까운 pid를 가져온다고 되어있지만 도큐먼트에서 내용은 조금 다릅니다.


This is a useful dispatch function which can be used from client functions. It returns a process on the local node, if such a process exist. Otherwise, it chooses one randomly.


로컬에 존재하지 않으면 랜덤으로 pid를 가져오는 방식… 설마 정말 랜덤으로 가져올까 싶어 otp 소스를 확인해 보았지만.. 정말 랜덤이였습니다.



%% otp/lib/kernel/src/pg2.erl
get_closest_pid(Name) ->
case get_local_members(Name) of
[Pid] ->
Pid;
[] ->
case get_members(Name) of
[] -> {error, {no_process, Name}};
Members ->
random_element(Members)
end;
Members when is_list(Members) ->
random_element(Members);
Else ->
Else
end.

random_element(List) ->
X = abs(erlang:monotonic_time()
bxor erlang:unique_integer()),
lists:nth((X rem length(List)) + 1, List).

그래서인지 Will Larson라는 분은 블로그에 각 pid들 마다 큐사이즈를 체크해서 가장 큐에 쌓이 메시지가 적은 pid를 가져오는 함수를 올려주셨는데 모든 멤버에 프로세스에 erlang:process_info() 를 호출하는게 조금 걱정되긴 하네요. (lethain.com)



get_best_pid(Group) ->
Members = pg2:get_members(Group),
Members1 = lists:map(fun(Pid) ->
[{message_queue_len, Messages}] = erlang:process_info(Pid, [message_queue_len]),
{Pid, Messages}
end, Members),
case lists:keysort(2, Members1) of
[{Pid, _} | _] -> Pid;
[] -> {error, empty_process_group}
end.

Round Robin 방식만 넣어줬어도 좋았을텐데.. 그부분이 아쉽지만 PG2는 충분히 편하고 좋은 라이브러리인건 분명합니다.


2016년 2월 11일 목요일

10분안에 훑어보는 Elixir

RT:FM + 나는프로그래머다 컨퍼런스 에서 발표한 ‪elixir‬ 라이브 코딩 슬라이드 입니다. Supervisor, GenServer, Agent를 이용해서 매우 단순한 fault-tolerance server를 구현해보는 예제인데 짧은 시간안에 라이브 코딩을 하려니 생략된 부분도 많아서 부족한 부분이 많네요.