Archive for the ‘concurrency’ tag

Here’s my solution for the N processes in a star concurrency exercise, you can run the program by calling the main function—comments appreciated.

-module(star).
-export([main/3, vertex_node/0, center_node/4]).

%% Write a function which starts N processes in a star,
%% and sends a message to each of them M times.
%% After the messages have been sent the processes
%% should terminate gracefully.

vertex_node() ->
  receive
    {stop, Center_PID} ->
      io:format("recieved ~w, from ~w, exiting~n", [stop, Center_PID]),
	  exit(ok);
	{Something, Center_PID} ->
		io:format("recieved message: ~s, from ~w~n", [Something, Center_PID]),
		Center_PID ! {self(), ok},
		vertex_node()
  end.

center_node(_, _, 0, Original_Nodes ) ->
	lists:map(fun(Vertex_Node) -> Vertex_Node ! {stop, self()} end, Original_Nodes);

center_node(Message, [], Count, Origingal_Nodes) ->
	center_node(Message, Origingal_Nodes, Count -1, Origingal_Nodes);

center_node(Message, Nodes, Count, Original_Node_List) ->
	[Vertex_Node | Remaining_Nodes] = Nodes,
	Vertex_Node ! {Message, self()},
	receive
		{Pid, Msg} ->
			io:format("center received ack: ~w, from ~w~n", [Msg, Pid])
	end,
	center_node(Message, Remaining_Nodes, Count, Original_Node_List).

main(Processes, Messages, Message) ->
	Nodes = lists:map(fun(X) -> spawn(star, vertex_node, []) end, lists:seq(1, Processes)),
	spawn(star, center_node, [Message, Nodes, Messages, Nodes]).

A nicely formatted version is available here.

P.S. If anyone knows how to do add syntax highlighting for Erlang to Windows Live Writer, please let me know.

Most programming languages implement concurrency poorly, Erlang has taken a different approach to concurrency than today’s popular programming language, as we’ve seen, it is easy to create parallel processes of execution in Erlang. Lets look more closely at how to create and communicate with these processes in Erlang by creating a small (contrived and basic) program which simulates a buyer buying stocks from a seller.

We need three Erlang primitives to work with processes: spawn! and receive.  The built-in spawn function creates a new process executing a function and returns the new process’ process id. The ! operator sends a message to a process and processes use the receive...end function to match messages in the mailbox to patterns and execute the appropriate functions.

To send a message “BUY MSFT” to the Seller process, we use:

   Seller ! {{buy, msft} , Buyer}

we include the variable Buyer which contains the process id of the buyer, this lets the seller know which process to send the acknowledgment to.  Now we need to create the Seller process, we do this by calling the spawn/3 function– the first argument to spawn is the module name, followed by the function name and ending with a list of arguments:

   Seller_pid = spawn(exchange, seller, [])

Both the Buyer and Seller processes can handle the message using pattern matching in the receive..end clause, Here the Seller process receives the {buy, Symbol} message, processes it, and then acknowledges the transaction by sending an “ok” to the buyer:

seller() ->
     receive
        {finished, _} ->
            io:format("Received finished. no one is buying~n", []);
        {{buy, Symbol}, Buyer_pid} ->
            io:format("sold ~s to buyer~n", [Symbol]),
            Buyer_pid ! ok,
            seller();
    end.

Now, lets put these pieces together in a program in which the Buyer process buys Stocks from the Seller and in return, the Seller process acknowledges the sale; the program terminates when the Buyer has run out of Stocks to buy.

-module(exchange).
-export([start/0, buyer/2, seller/0]).

buyer([], Seller_pid) ->
    Seller_pid ! {finished, self()},
    %% note there's a ";" instead of a "." at the end of the next clause
    io:format("buyer finished~n", []);

buyer(List, Seller_pid) ->
	[Head|Tail] = List,
    Seller_pid ! {Head, self()},
    receive
        ok ->
            io:format("buyer received seller confirmation~n", [])
    end,
    buyer(Tail, Seller_pid).

seller() ->
     receive
        {finished, _} ->
            io:format("Received finished. no one is buying~n", []);
        {{buy, Symbol}, Buyer_pid} ->
            io:format("sold ~s to buyer~n", [Symbol]),
            Buyer_pid ! ok,
            seller()
            %% note there's, no ";" or "," before the end
    end.

start() ->
  Things_to_buy = [{buy, sina}, {buy, msft}, {buy, intc}],
  Seller_pid = spawn(exchange, seller, []),
  spawn(exchange, buyer, [Things_to_buy, Seller_pid]).

The program illustrates the most basic ways to use Erlang processes, I’ve completely skipped over guards, error handling, timeouts and registered processes, but several detailed documents on these topics and  Erlang design principles are available on there ftp site.

Erlang’s main strength is its support for concurrency– now I’ll extend the previous Erlang example to fetch the stock quotes for multiple symbols in parallel.

Because I love abstractions, I’ll use the function pmap– pmap works like map, but when the function is called it creates one parallel process to evaluate each argument in the list. You can find a copy of the code on the book’s website. Generally, it’s not advisable to use pmap when the the list is very small or very large. Ideally, the implementation of pmap should be modified to spawn n processes, but that is something that I’m not comfortable doing yet.

Full Listing

   1:  -module(quote).
   2:  -import(json_parser).
   3:   
   4:  -export([get_stock_quote/1, get_data_in_parallel/0]).
   5:   
   6:  -define(BASE_URL, "http://www.google.com/finance/info?client=ig&q=".
   7:   
   8:  symbols() ->
   9:      [ "MSFT", "RHIE", "INTC", "DPTR", 
  10:        "RVSB", "BBGI", "SRDD", "DEAR",
  11:        "ALKS", "GOOG", "QQQQ", "AAPL",
  12:        "RIMM", "GEOY", "CBST", "ANGO"
  13:      ].
  14:      
  15:      
  16:  get_google_url(Symbol) ->
  17:      ?BASE_URL ++ Symbol.
  18:     
  19:  get_stock_quote(Symbol) ->
  20:      %% moved inets:start() outside this function
  21:      URL =  get_google_url(Symbol),
  22:      { ok, {_Status, _Headers, Body }} = http:request(URL),
  23:      PureData = lists:subtract(lists:subtract(Body, "// [ "), "] "),
  24:      {_,{_,RealData},_} = json_parser:dvm_parser(list_to_binary(PureData)),
  25:      RealData.
  26:      
  27:   
  28:  get_data_in_parallel() ->
  29:      inets:start(),
  30:      lib_misc:pmap(fun get_stock_quote/1, symbols()).

Search