Programming Erlang

Table of Contents

Programming Erlang

Erlang has many tools for building distributed systems.

Pid = spawn(Mod, Func, Args) % Spawns a new process
Pid = spawn(Fun) % Spawns a new process after evaluating Fun.
% Useful for dynamic code upgrading.
Pid ! Message % Sends Message to the process with Pid.

In Erlang, ! is the send operator, and it is asynchronous. Every process has its own mailbox that it can peruse at its own leisure. Thus, messages may be received out of order or not at all.

receive
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
end

Receive is used to pattern match on messages received by a process.

An example server might look like this:

-module(area_server_final).

-export([start/0, area/2, loop/0]).

start() ->
  spawn(area_server_final, loop, []).

area(Pid, What) ->
  rpc(Pid, What).

rpc(Pid, Request) ->
  Pid ! {self(), Request},
  receive
    {_, Response} ->
      Response
  end.

loop() ->
  receive
    {From, {rectangle, Width, Ht}} ->
      From ! {self(), Width * Ht},
      loop();
    {From, {circle, R}} ->
      From ! {self(), 3.14159 * R * R},
      loop();
    {From, Other} ->
      From ! {self(), {error, Other}},
      loop()
  end.

Processes are pretty cheap to spawn, in my computer, you’re allowed plenty of processes, each costing about two microseconds to spawn.

1> processes:max(500000).
Maximum allowed processes:33554432
Process spawn time=2.292 (1.872) microseconds
ok

You are also able to timeout in a receive, in case you want to check all your messages or run some code after a set amount of time.

Erlang is well known for its philosophy around “let it crash”. There are some other error handling methods:

Processes can become system processes, with process_flag(trap_exit, true).. This is useful for becoming a stop to propagating errors, also called a firewall.

Processes may be linked: if A and B are linked, if either terminates, an error signal will be sent to all other linked nodes.

In Pid A, Pid A can link to Pid B by calling link, so link(B).

Processes can be Monitored, which is one directional. If A monitors B, and B terminates, then A will receive a “down” message, but if A terminates, then B receives no message.

Systems can act on all error signals except kill, which is generated by exit(Pid, kill).. This can’t be caught at all.

Assume Pid A is linked to Pid B, but Pid B is a firewall, as it has become a system process. Then if Pid A terminates, Pid B can catch the signal and decline to act on the signal. This is useful in a system that is heavily linked.

% To spawn a linked pid:
-spec spawn_link(Fun) -> Pid.
-spec spawn_link(Mod, Func, Args) -> Pid.

% To spawn a monitor pid:
% The monitoring process receives this tuple {'DOWN',Ref,process,Pid,Why}
% termination
-spec spawn_monitor(Fun) -> {Pid, Ref}.
-spec spawn_monitor(Mod, Func, Args) -> {Pid, Ref}.

% To promote a process to a system process
-spec process_flag(trap_exit, true)

% to link Pids bidirectionally
-spec link(Pid) -> true.

% to unlink Pids
-spec unlink(Pid) -> true.

% to monitor a pid
-spec erlang:monitor(process, Item) -> Ref

% to unmonitor a pid
-spec demonitor(Ref) -> true.

% to terminate the current pid with reason
-spec exit(Why) -> none().

% to terminate a specific Pid with a reason
-spec exit(Pid, Why) -> true.

To create a process that watches a pid for termination and runs code on its termination:

on_exit(Pid, Fun) ->
    spawn(fun() ->
        Ref = monitor(process, Pid),
            receive
                {'DOWN', Ref, process, Pid, Why} ->
                    Fun(Why)
                end
            end).

Or to link a set of processes:

start(Fs) ->
    spawn(fun() ->
        [spawn_link(F) || F <- Fs],
        receive
            after
                infinity -> true
            end
        end).

And run code on exit.

Pid = start([F1, F2, ...]),
on_exit(Pid, fun(Why) ->
    % Some code here
end)

Or making a process that never dies:

keep_alive(Name, Fun) ->
    register(Name, Pid = spawn(Fun)),
        on_exit(Pid, fun(_Why) -> keep_alive(Name, Fun) end). % recreate
        % the process on exit

To build distributed systems, there are two models of distribution:

Distributed Erlang Programs are written to run on erlang nodes, and they are all connected. Since they can spawn processes on each other’s nodes, they should be run in a trusted environment.

Socket Based Programs are written to use IP sockets, so TCP. We can also use an API gateway or some other firewall to vet the requests first and then run them on a distributed erlang system.

The chapter goes over distributing a name server, with this definition:

-module(kvs).

-export([start/0, store/2, lookup/1]).

start() ->
  register(kvs, spawn(fun() -> loop() end)).

store(Key, Value) ->
  rpc({store, Key, Value}).

lookup(Key) ->
  rpc({lookup, Key}).

rpc(Q) ->
  kvs ! {self(), Q},
  receive
    {kvs, Reply} ->
      Reply
  end.

loop() ->
  receive
    {From, {store, Key, Value}} ->
      put(Key, {ok, Value}),
      From ! {kvs, true},
      loop();
    {From, {lookup, Key}} ->
      From ! {kvs, get(Key)},
      loop()
  end.

Next, it distributes the application between two different nodes on the same host:

$ erl -sname gandalf % set the name of the node to gandalf
(gandalf@localhost) 1> kvs:start().
true

And stores a key:

$ erl -sname bilbo
(bilbo@localhost) 1> rpc:call(gandalf@localhost, kvs,store, [weather, fine]).
true
bilbo@localhost) 2> rpc:call(gandalf@localhost,
kvs,lookup,[weather]).
{ok,fine}

And then looking it up on the first node shows it worked.

gandalf@localhost) 2> kvs:lookup(weather).
{ok,fine}

Next, to two different machines on the same LAN, we set the same cookie on both nodes to show that they have permissions to access each other’s data.

On doris, the first machine:

doris $ erl -name gandalf -setcookie abc
(gandalf@doris.myerl.example.com) 1> kvs:start().
true

And on the second machine, george

george $ erl -name bilbo -setcookie abc
(bilbo@george.myerl.example.com) 1> rpc:call(gandalf@doris.myerl.example.com,
kvs,store,[weather,cold]).
true
(bilbo@george.myerl.example.com) 2> rpc:call(gandalf@doris.myerl.example.com,
kvs,lookup,[weather]).
{ok,cold}

Finally, to connect the client and server on different hosts:

First, open port 4369 to TCP and UDP traffic, which is used by epmd (the erlang port mapper daemon).

Next, choose a set of ports to be used for distributed erlang. These ports must be open.

The command when starting the shell looks like this:

$ erl -name ... -setcookie ... -kernel inet_dist_listen_min Min \
inet_dist_listen_max Max

Some functions for distributed erlang:

% This evaluates apply(Mod, Function, Args) on Node and returns the result Result
% or {badrpc, Reason} if the call fails.
call(Node, Mod, Function, Args) -> Result | {badrpc, Reason}
% This works exactly like spawn(Fun), but the new process is spawned on Node.
spec spawn(Node, Fun) -> Pid
% This works exactly like spawn(Mod, Func, ArgList), but the new process is
% spawned on Node. spawn(Mod, Func, Args) creates a new process that evaluates
% apply(Mod, Func, Args). It returns the PID of the new process.
% Note: This form of spawn is more robust than spawn(Node, Fun). spawn(Node,
% Fun) can break when the distributed nodes are not running exactly the
% same version of a particular module.
-spec spawn(Node, Mod, Func, ArgList) -> Pid
% This works exactly like spawn_link(Fun), but the new process is spawned on Node.
-spec spawn_link(Node, Fun) -> Pid
% This works like spawn(Node, Mod, Func, ArgList), but the new process is linked
% to the current process.
-spec spawn_link(Node, Mod, Func, ArgList) -> Pid
% This forcibly disconnects a node.
-spec disconnect_node(Node) -> bool() | ignored

% If Flag is true, monitoring is turned on; if Flag is false, monitoring is turned
% off. If monitoring has been turned on, then the process that evaluated
% this BIF will be sent {nodeup, Node} and {nodedown, Node} messages if Node
% joins or leaves the set of connected Erlang nodes.
-spec monitor_node(Node, Flag) -> true
% This returns the name of the local node. nonode@nohost is returned if the
% node is not distributed.
-spec node() -> Node
% This returns the node where Arg is located. Arg can be a PID, a reference,
% or a port. If the local node is not distributed, nonode@nohost is returned.
-spec node(Arg) -> Node
% This returns a list of all other nodes in the network to which we are con-
% nected.
-spec nodes() -> [Node]
% This returns true if the local node is alive and can be part of a distributed
% system. Otherwise, it returns false.
-spec is_alive() -> bool()

This spawns a process on a remote node:

-module(dist_demo).

-export([rpc/4, start/1]).

start(Node) ->
  spawn(Node, fun() -> loop() end).

rpc(Pid, M, F, A) ->
  Pid ! {rpc, self(), M, F, A},
  receive
    {_, Response} ->
      Response
  end.

loop() ->
  receive
    {rpc, Pid, M, F, A} ->
      Pid ! {self(), catch apply(M, F, A)},
      loop()
  end.

To run the code:

doris $ erl -name gandalf -setcookie abc
(gandalf@doris.myerl.example.com) 1>

george $ erl -name bilbo -setcookie abc
(bilbo@george.myerl.example.com) 1>

(bilbo@george.myerl.example.com) 1> Pid =
dist_demo:start('gandalf@doris.myerl.example.com').
<5094.40.0>

(bilbo@george.myerl.example.com) 2> dist_demo:rpc(Pid, erlang, node, []).
'gandalf@doris.myerl.example.com'

This all works fine if all nodes are controlled by the same user.

In the case that they’re not, socket based programming is required:

To do that, erlang provides lib_chan, which allows you to create a server that requires authentication with a password.

The server creates a config file:

{port, 1234}.
{service, nameServer, password, "ABXy45",
mfa, mod_name_server, start_me_up, notUsed}.

And this code is run:

-module(mod_name_server).

-export([start_me_up/3]).

start_me_up(MM, _ArgsC, _ArgS) ->
  loop(MM).

loop(MM) ->
  receive
    {chan, _MM, {store, K, V}} ->
      kvs:store(K, V),
      loop(_MM);
    {chan, _, {lookup, K}} ->
      MM ! {send, kvs:lookup(K)},
      loop(MM);
    {chan_closed, _} ->
      true
  end.

To interface between erlang and other languages, there are a few ways:

  1. Running an external server and connecting to it via a port (kind of like a microkernel) (safe).

  2. Running an OS command and capturing the result

  3. Linking the foreign code inside the Erlang machine (unsafe).

To program with sockets:

A TCP client to get data from Google:

-module(socket_examples).

-export([nano_get_url/0]).

nano_get_url() ->
  nano_get_url("www.google.com").

nano_get_url(Host) ->
  {ok, Socket} = gen_tcp:connect(Host, 80, [binary, {packet, 0}]),
  ok = gen_tcp:send(Socket, "GET / HTTP/1.0\r\n\r\n"),
  receive_data(Socket, []).

receive_data(Socket, SoFar) ->
  receive
    {tcp, _, Bin} ->
      receive_data(Socket, [Bin | SoFar]);
    {tcp_closed, _} ->
      list_to_binary(lists:reverse(SoFar))
  end.

A TCP server might look like this:

start_nano_server() ->
  {ok, Listen} =
    gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
  {ok, Socket} = gen_tcp:accept(Listen),
  gen_tcp:close(Listen),
  loop(Socket).

loop(Socket) ->
  receive
    {tcp, _, Bin} ->
      io:format("Server received binary = ~p~n", [Bin]),
      Str = binary_to_term(Bin),
      io:format("Server (unpacked) ~p~n", [Str]),
      Reply = lib_misc:string2value(Str),
      io:format("Server replying = ~p~n", [Reply]),
      gen_tcp:send(Socket, term_to_binary(Reply)),
      loop(Socket);
    {tcp_closed, _} ->
      io:format("Server socket closed~n")
  end.

To create a sequential server, we change the start server command and the loop to block.

start_seq_server() ->
  {ok, Listen} =
    gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
  seq_loop(Listen).

seq_loop(Listen) ->
  {ok, Socket} = gen_tcp:accept(Listen),
  loop(Socket),
  seq_loop(Listen).

To create a parallel server, we spawn a new process each time for gen_tcp:accept.

This allows for potentially infinite connections, so we may want to limit the number of connections with a counter.

start_parallel_server() ->
  {ok, Listen} =
    gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
  spawn(fun() -> par_connect(Listen) end).

par_connect(Listen) ->
  {ok, Socket} = gen_tcp:accept(Listen),
  spawn(fun() -> par_connect(Listen) end),
  loop(Socket).

Sockets can be opened in 3 different modes:

  1. active
  2. active once
  3. passive

Active sockets have no congestion control – if a rogue process sends millions of messages, the flow of messages could overflow the controlling process.

Active once is active for only one message. Afterwards, it must be re-enabled.

Passive sockets allow the controlling process to control message consumption by calling gen_tcp:recv(N) , where N is the number of bytes. If N is 0, all bytes currently available are consumed.

An active (non-blocking) server might look like this:

This server cannot block the client(s), so we only use them when we’re confident that we can keep up with all the messages.

start_active_server() ->
  {ok, Listen} =
    gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, true}]),
  {ok, Socket} = gen_tcp:accept(Listen),
  loop(Socket).

A passive server might look like this:

start_passive_server() ->
  {ok, Listen} =
    gen_tcp:listen(2345, [binary, {packet, 4}, {reuseaddr, true}, {active, false}]),
  {ok, Socket} = gen_tcp:accept(Listen),
  loop_passive(Socket).

Where loop_passive might look like the following:

loop_passive(Socket) ->
    case gen_tcp:recv(Socket, N) of
        {ok, B} ->
            loop(Socket);
        {error, closed} ->
            % exit
    end.

This has its pros, but this only allows for waiting for data from one socket.

In order to receive messages from more than one socket and wait:

loop(Socket) ->
    receive
        {tcp, Socket, Data} ->
            %% when you're ready enable the next message
            inet:setopts(Sock, [{active, once}]),
            loop(Socket);
        {tcp_closed, Socket} ->
            %% do something on close
            loop(Socket);
    end.

We can also use inet:peername to find out where connections come from.

A UDP server might look like the following:

udp_server(Port) ->
  {ok, Socket} = gen_udp:open(Port, [binary]),
  udp_loop(Socket).

udp_loop(Socket) ->
  receive
    {udp, _Socket, Host, Port, _Bin} ->
      BinReply = "reply",
      gen_udp:send(Socket, Host, Port, BinReply),
      udp_loop(Socket)
  end.

A client would look like this:

udp_client(Request) ->
  {ok, Socket} = gen_udp:open(0, [binary]),
  ok = gen_udp:send(Socket, "localhost", 4000, Request),
  Value =
    receive
      {udp, _, _, _, Bin} ->
        {ok, Bin}
    after 2000 ->
      error
    end,
  gen_udp:close(Socket),
  io:format("%d ~n", [Value]);
end.

UDP can send packets in fragments, or out of order, or twice. To deal with packet fragments, we can increase the maximum transfer unit to greater than 500 (the usual default).

Our server should be able to handle packets out of order – if not, then we could use something like TCP, which handles that.

To handle messages twice, we can tag them with a unique reference.

The code would look like this:

client(Request) ->
    {ok, Socket} = gen_udp:open(0, [binary]),
        Ref = make_ref(), %% make a unique reference
        B1 = term_to_binary({Ref, Request}),
        ok = gen_udp:send(Socket, "localhost", 4000, B1),
        wait_for_ref(Socket, Ref).
    wait_for_ref(Socket, Ref) ->
        receive
            {udp, Socket, _, _, Bin} ->
                case binary_to_term(Bin) of
                    {Ref, Val} ->
                    %% got the correct value
                        Val;
                    {_SomeOtherRef, _} ->
                        %% some other value throw it away
                    wait_for_ref(Socket, Ref)
                end;
            after 1000 ->
            % timeout
        end.

To broadcast to different ports with UDP:

-module(broadcast).

-compile(export_all).

send(IoList) ->
  case inet:ifget("eth0", [broadaddr]) of
    {ok, [{broadaddr, Ip}]} ->
      {ok, S} = gen_udp:open(5010, [{broadcast, true}]),
      gen_udp:send(S, Ip, 6000, IoList),
      gen_udp:close(S);
    _ ->
      io:format("Bad interface name, or\nbroadcasting not supported\n")
  end.

listen() ->
  {ok, _} = gen_udp:open(6000),
  loop().

loop() ->
  receive
    Any ->
      io:format("received:~p~n", [Any]),
      loop()
  end.

Erlang supports storing tuples in databases with ETS and DETS. ETS stands for Erlang term storage, and is in RAM. DETS stands for Disk ETS, and is on disk. The tuples must have one key (the leftmost item) and the rest of the items can be anything.

There are four types:

There are four functions to deal with tables:

  1. Creation: (ets:new, dets:open_file).
  2. Insertion insert(Table_id, X). X can be a tuple or list of tuples.
  3. Lookup: (lookup(table_id, Key)).
  4. Disposal: (dets:close(table_id), ets:delete(table_id)).

ETS tables use a separate storage area than normal process memory – but they are owned by their calling process, and freed if that process terminates.

% Name is an atom
-spec ets:new(Name, [Opt]) -> TableId.

%[Opt] can be
set | ordered_set | bag | duplicate_bag % the types

private % only the owner can read and write to this table
public % anyone who knows the table_id can read and writer
protected % any process that knows the table can read, but only the
          % owner can write
named_table % if present, then Name can be used in place of table_id
{keypos, K} % use K as the key position. This is useful for passing a
% record or other items where the key isn't exactly there.

ETS and DETS also support other things:

For more heavy handed operations, erlang has a database, Mnesia.

Mnesia is distributed, so it takes a list of nodes to connect to:

mnesia:create_schema([node()]).

Mnesia isn’t queried via SQL, but via list comprehensions.

-record(shop, {item, quantity, cost}).
-record(cost, {name, price}).

We can set up the schema:

do_this_once() ->
    mnesia:create_schema([node()]),
    mnesia:start(),
    mnesia:create_table(shop, [{attributes, record_info(fields, shop)}]),
    mnesia:create_table(cost, [{attributes, record_info(fields, cost)}]),
    mnesia:create_table(design, [{attributes, record_info(fields, design)}]),
    mnesia:stop().

And select all items using list comprehensions.

demo(select_shop) ->
    do(qlc:q([X || X <- mnesia:table(shop)]));

You can select certain fields:

demo(select_some) ->
    do(qlc:q([{X#shop.item, X#shop.quantity} || X <- mnesia:table(shop)]))

And select if a condition is met:

demo(reorder) ->
    do(qlc:q([X#shop.item || X <- mnesia:table(shop),
        X#shop.quantity < 250
    ]));

And join two tables

demo(join) ->
    do(qlc:q([X#shop.item || X <- mnesia:table(shop),
        X#shop.quantity < 250,
        Y <- mnesia:table(cost),
        X#shop.item =:= Y#cost.name,
        Y#cost.price < 2
    ])).

We can add rows:

add_shop_item(Name, Quantity, Cost) ->
    Row = #shop{item=Name, quantity=Quantity, cost=Cost},
    F = fun() ->
        mnesia:write(Row)
    end,
    mnesia:transaction(F).

Or remove them:

remove_shop_item(Item) ->
    Oid = {shop, Item},
        F = fun() ->
            mnesia:delete(Oid)
        end,
    mnesia:transaction(F).

Transactions can also be aborted:

farmer(Nwant) ->
    F = fun() ->
        %% find the number of apples
        [Apple] = mnesia:read({shop,apple}),
        Napples = Apple#shop.quantity,
        Apple1 = Apple#shop{quantity = Napples + 2*Nwant},
        %% update the database
        mnesia:write(Apple1),
        %% find the number of oranges
        [Orange] = mnesia:read({shop,orange}),
        NOranges = Orange#shop.quantity,
        if
            NOranges >= Nwant ->
            N1 = NOranges - Nwant,
            Orange1 = Orange#shop{quantity=N1},
            %% update the database
            mnesia:write(Orange1);
        true ->
            %% Oops -- not enough oranges
            mnesia:abort(oranges)
    end
end,
mnesia:transaction(F)

You also dont have to store homogeneous data in Mnesia:

-record(design, {id, plan}).
add_plans() ->
D1 = #design{id = {joe,1},
plan = {circle,10}},
D2 = #design{id = fred,
plan = {rectangle,10,5}},
D3 = #design{id = {jane,{house,23}},
plan = {house,
[{floor,1,
[{doors,3},
{windows,12},
{rooms,5}]},
{floor,2,
[{doors,2},
{rooms,4},
{windows,15}]}]}},
F = fun() ->
        mnesia:write(D1),
        mnesia:write(D2),
        mnesia:write(D3)
    end,
mnesia:transaction(F).

Mnesia tables can be on RAM, Disk, or both.

They can also be fragmented (horizontally partitioned).

Mnesia can also have different nodes store RAM or disk copies of the database. You can use this to implement as much resilience or as little resilience as required.

Erlang also comes with a debugger, which is very useful.

Erlang’s biggest feature is OTP, the Open telecom protocol, for building generic servers:

The first server is a generic server that can take a callback to become that server:

-module(server1).

-export([start/2, rpc/2]).

start(Name, Mod) ->
  register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
  Name ! {self(), Request},
  receive
    {_, Response} ->
      Response
  end.

loop(Name, Mod, State) ->
  receive
    {From, Request} ->
      {Response, State1} = Mod:handle(Request, State),
      From ! {Name, Response},
      loop(Name, Mod, State1)
  end.

And a callback to provide a name server.

-module(name_server).

-export([init/0, add/2, find/1, handle/2]).

-import(server1, [rpc/2]).

%% client routines
add(Name, Place) ->
  rpc(name_server, {add, Name, Place}).

find(Name) ->
  rpc(name_server, {find, Name}).

%% callback routines
init() ->
  dict:new().

handle({add, Name, Place}, Dict) ->
  {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
  {dict:find(Name, Dict), Dict}.

If you run it:

4> server1:start(name_server, name_server).
true
5> name_server:add(joe, "at home").
ok
6> name_server:find(joe).
{ok,"at home"}

The name server didn’t have to know anything about concurrency, spawn, send, receive, or register.

It’s just sequential code – and gen_server takes care of the rest.

The second server provided gives transactional guarantees:

If it throws an exception, it crashes.

-module(server2).

-export([start/2, rpc/2]).

start(Name, Mod) ->
  register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

rpc(Name, Request) ->
  Name ! {self(), Request},
  receive
    {_, crash} ->
      exit(rpc);
    {_, ok, Response} ->
      Response
  end.

loop(Name, Mod, OldState) ->
  receive
    {From, Request} ->
      try Mod:handle(Request, OldState) of
        {Response, NewState} ->
          From ! {Name, ok, Response},
          loop(Name, Mod, NewState)
      catch
        _:Why ->
          log_the_error(Name, Request, Why),
          %% send a message to cause the client to crash
          From ! {Name, crash},
          %% loop with the *original* state
          loop(Name, Mod, OldState)
      end
  end.

log_the_error(Name, Request, Why) ->
  io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]).

To add hot code swapping, we can send gen_server a message with the new code to run:

-module(server3).

-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
  register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

swap_code(Name, Mod) ->
  rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
  Name ! {self(), Request},
  receive
    {_Name, Response} ->
      Response
  end.

loop(Name, Mod, OldState) ->
  receive
    {From, {swap_code, NewCallBackMod}} ->
      From ! {Name, ack},
      loop(Name, NewCallBackMod, OldState);
    {From, Request} ->
      {Response, NewState} = Mod:handle(Request, OldState),
      From ! {Name, Response},
      loop(Name, Mod, NewState)
  end.

This can be shown by changing the name server to allow it to be hot swapped:

-module(name_server1).

-export([init/0, add/2, find/1, handle/2]).

-import(server3, [rpc/2]).

%% client routines
add(Name, Place) ->
  rpc(name_server, {add, Name, Place}).

find(Name) ->
  rpc(name_server, {find, Name}).

%% callback routines
init() ->
  dict:new().

handle({add, Name, Place}, Dict) ->
  {ok, dict:store(Name, Place, Dict)};
handle({find, Name}, Dict) ->
  {dict:find(Name, Dict), Dict}.

We can then swap that.

But we can also upgrade the server to add more functionality, namely, looking for all names:

-module(new_name_server).

-export([init/0, add/2, all_names/0, delete/1, find/1, handle/2]).

-import(server3, [rpc/2]).

%% interface
all_names() ->
  rpc(name_server, allNames).

add(Name, Place) ->
  rpc(name_server, {add, Name, Place}).

delete(Name) ->
  rpc(name_server, {delete, Name}).

find(Name) ->
  rpc(name_server, {find, Name}).
%% callback routines
init() -> dict:new().
handle({add, Name, Place}, Dict) -> {ok, dict:store(Name, Place, Dict)};
handle(allNames, Dict) -> {dict:fetch_keys(Dict), Dict};
handle({delete, Name}, Dict) -> {ok, dict:erase(Name, Dict)};
handle({find, Name}, Dict) -> {dict:find(Name, Dict), Dict}.
4> c(new_name_server).
{ok,new_name_server}
5> server3:swap_code(name_server, new_name_server).
ack
6> new_name_server:all_names().
[joe,helen]

The next server can have transactions and hot code swapping:

-module(server4).

-export([start/2, rpc/2, swap_code/2]).

start(Name, Mod) ->
  register(Name, spawn(fun() -> loop(Name, Mod, Mod:init()) end)).

swap_code(Name, Mod) ->
  rpc(Name, {swap_code, Mod}).

rpc(Name, Request) ->
  Name ! {self(), Request},
  receive
    {_Name, crash} ->
      exit(rpc);
    {_Name, ok, Response} ->
      Response
  end.

loop(Name, Mod, OldState) ->
  receive
    {From, {swap_code, NewCallbackMod}} ->
      From ! {Name, ok, ack},
      loop(Name, NewCallbackMod, OldState);
    {From, Request} ->
      try Mod:handle(Request, OldState) of
        {Response, NewState} ->
          From ! {Name, ok, Response},
          loop(Name, Mod, NewState)
      catch
        _:Why ->
          log_the_error(Name, Request, Why),
          From ! {Name, crash},
          loop(Name, Mod, OldState)
      end
  end.

log_the_error(Name, Request, Why) ->
  io:format("Server ~p request ~p ~ncaused exception ~p~n", [Name, Request, Why]).

Another server does nothing until it becomes a server you send it:

-module(server5).

-export([start/0, rpc/2]).

start() ->
  spawn(fun() -> wait() end).

wait() ->
  receive
    {become, F} ->
      F()
  end.

rpc(Pid, Q) ->
  Pid ! {self(), Q},
  receive
    {_Pid, Reply} ->
      Reply
  end.
-module(my_fac_server).

-export([loop/0]).

loop() ->
  receive
    {From, {fac, N}} ->
      From ! {self(), fac(N)},
      loop();
    {become, Something} ->
      Something()
  end.

fac(0) ->
  1;
fac(N) ->
  N * fac(N - 1).

A callback server for gen_server takes 3 steps:

  1. Decide on the name
  2. Write the interface functions
  3. Implement the six callback routines:
    • init/1
    • handle_call/3
    • handle_cast/2
    • handle_info/2
    • terminate/2
    • code_change/3

A template might look like this:

-module(gen_server_template).

%% gen_server_mini_template
-behaviour(gen_server).

-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
         code_change/3]).

start_link() ->
  gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

init([]) ->
  {ok, State}.

handle_call(_Request, _From, State) ->
  {reply, Reply, State}.

handle_cast(_Msg, State) ->
  {noreply, State}.

handle_info(_Info, State) ->
  {noreply, State}.

terminate(_Reason, _State) ->
  ok.

code_change(_OldVsn, State, Extra) ->
  {ok, State}.

OTP also provides supervisors, which are processes that monitor other processes.

One kind is one_for_one, where if a worker fails, it is restarted by the supervisor, and another is one_for_all, where if one process crashes, all are terminated and restarted.

An example using one_for_one might look like this:

-module(sellaprime_supervisor).

-behaviour(supervisor). % see erl -man supervisor

-export([start/0, start_in_shell_for_testing/0, start_link/1, init/1]).

start() ->
  spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).

start_in_shell_for_testing() ->
  {ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),
  unlink(Pid).

start_link(Args) ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, Args).

init([]) ->
  %% Install my personal error handler
  gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {my_alarm_handler, xyz}),
  {ok,
   {{one_for_one, 3, 10},
    [{tag1, {area_server, start_link, []}, permanent, 10000, worker, [area_server]},
     {tag2, {prime_server, start_link, []}, permanent, 10000, worker, [prime_server]}]}}.

Finally, we create an app, which starts the supervisor:

-module(sellaprime_app).

-behaviour(application).

-export([start/2, stop/1]).

start(_Type, StartArgs) ->
  sellaprime_supervisor:start_link(StartArgs).

stop(_State) ->
  ok.

An app started by and run by a supervisor. Nice.

Some other interesting servers:

A multi purpose server:

-module(multi_server).

-export([start/0]).

start() ->
  spawn(fun() -> multi_server() end).

multi_server() ->
  receive
    {_Pid, {email, _From, _Subject, _Text} = Email} ->
      {ok, S} = file:open("mbox", [write, append]),
      io:format(S, "~p.~n", [Email]),
      file:close(S);
    {_Pid, {im, From, Text}} ->
      io:format("Msg (~s): ~s~n", [From, Text]);
    {Pid, {get, File}} ->
      Pid ! {self(), file:read_file(File)};
    Any ->
      io:format("multi server got:~p~n", [Any])
  end,
  multi_server().

A stateful module:

-module(counter).

-export([bump/2, read/1]).

bump(N, {counter, K}) ->
  {counter, N + K}.

read({counter, N}) ->
  N.