在Erlang运行时中,提供了几种机制来实现与其它程序或者语言的通信。一种为分布式Erlang,一种为端口,其中端口分为普通端口和链入式驱动端口,还有后面引入的NIFs。
-
分布式Erlang:一个Erlang运行时可以看作一个分布式Erlang Node,并通过名字访问它。一个Erlang Node可以连接和监视其他的Node,甚至在其他Node上面创建Process。消息传递和异常捕获在不同的Node之间是透明的,其分布式通信在底层上是使用的Tcp/Ip,并且现有库中有大量模块可以用于操作Node,比如global就提供了全局Node名字注册的机制。分布式Erlang主要用于Erlang与Erlang之间的通信,当然也可以用于Erlang与C之间通信,当与C之间通信时,C将被视为一个C Node。C Node指的是在C中使用Erl_interface函数库来设置连接,并与Erlang Node通信,也称为hidden node。当使用C Node与Erlang通信时,对Erlang程序员来说是完全透明的,他并不知道,也不需要关心这个Node是C还是Erlang。
-
端口:从Erlang的角度来看,端口提供了一种方式来与外部程序进行通信,主要用于本机上与外部程序通信。对于普通端口来说,它通过一种面向字节流的通信方式来与Erlang通信,而链入式驱动端口则是通过回调。普通端口通信的实现方式依赖于具体平台,在Unix上,通信通过Pipe实现,外部程序通过stdin读入,通过stdout输出。理论上外部程序可以使用任何语言实现,只要它支持这种通信方式。至于链入式驱动端口出现的原因,主要还是效率问题。因为普通端口通信的外部程序是另外一个OS进程,因此在效率要求较高的场景下,这种方式很难胜任。所谓有得必有失,链入式驱动端口由于是直接载入动态库并嵌入虚拟机内部,而动态库由C语言按回调约定编写。所以在C语言编写上要注意很多问题,比如并发,内存分配,函数可重入等等。如果C代码崩溃,也会造成整个虚拟机的崩溃。
-
NIF(Native Implemented Function):一种类似于erlang中的BIF,给我的第一感觉就想到了luajit的FFI,在R13B03版本中被引入。它是由C直接实现的函数,并且由Erlang一个模块引用,其它模块通过这个引用模块对函数进行调用。和链入式驱动端口一样,NIF也是先将C编译成动态库(so in Unix,dll in windows),然后由Erlang模块动态加载进入虚拟机,所以它也会存在链入式驱动端口的问题。当然它也是与C通信方式中效率最高的一种,调用NIFs不需要上下文切换。
-
端口通信的一些接口:在C语言这边提供了接口,包含erl_marshal,erl_eterm,erl_format,erl_malloc来处理Erlang项式结构,erl_connect与远端Node通信等等,并且在Erlang端有term_to_binary/1,binary_to_term/1来对通信的数据进行编码与解码。
接下来我们详细聊下普通端口,其它方式在后面几节介绍:
- 普通端口
我们在Erlang中通过打开一个端口与C通信,而打开这个端口的Erlang进程被称为连接进程(Connected process)。所有与端口的通信都需要通过这个连接进程,如果这个进程终止了,那么这个端口与外部程序都会被关闭。(外部程序被关闭依赖编码)。我们可以通过BIF /2函数来打开一个端口。第一个参数使用{spwan,ExtPrg}。其中ExtPrg为外部程序名称,包含该程序的启动cmd line。第二个参数是一个选项列表,比如说{packet,2}。该选项表示使用2个字节的消息头来保存长度,这个消息头在Erlang端时由Erlang端口自动帮你填充,在C端时就需要你自己填充了。除了这个外,还有许多其它选项,可以查看/2文档。
我们先看一个简单普通端口实现,然后在这个实现的基础上使用实现消息体的编码与解码
/* complex.c */int foo(int x) { return x+1;}int bar(int y) { return y*2;}我们通过普通端口,实现对C语言foo,bar函数的访问。最后的效果就如同调用complex模块的函数一样,与C函数的通信被隐藏在complex.erl内部。% Erlang code...Res = complex:foo(X),...
下面是erlang部分:complex模块的实现 -module(complex1). -export([start/1, init/1]).start(ExtPrg) -> %%模块入口,spawn连接进程 spawn(?MODULE, init, [ExtPrg]).stop() -> %%发送关闭消息 complex ! stop.init(ExtPrg) -> %%连接进程初始化函数 register(complex, self()), %%注册complex为连接进程的名字 process_flag(trap_exit, true), %%接收外部程序退出的信号 Port = open_port({spawn, ExtPrg}, [{packet, 2}]), %%打开端口 loop(Port).foo(X) -> call_port({foo, X}).bar(Y) -> call_port({bar, Y}).%%complex/foo,complex/bar 接口向complex连接进程发送消息,连接进程再将消息发送给端口call_port(Msg) -> complex ! {call, self(), Msg}, receive {complex, Result} -> Result end.%%连接进程消息接收,转发给端口loop(Port) -> receive {call, Caller, Msg} -> Port ! {self(), {command, encode(Msg)}}, %%接收call_port发来的消息,并将消息转换成字节流转发给端口 receive {Port, {data, Data}} -> %%接收端口回来的消息,转发给调用者 Caller ! {complex, decode(Data)} end, loop(Port); stop -> Port ! {self(), close}, receive {Port, closed} -> exit(normal) end; { 'EXIT', Port, Reason} -> exit(port_terminated) end. encode({foo, X}) -> [1, X]; %%这里简化了一下,约定参数和结果都小于256,实际运用中一般可以使用term_to_binary/1,binary_to_term/1接口来做erlang项式到二进制的转换encode({bar, Y}) -> [2, Y]. decode([Int]) -> Int.
/************************************************************************************************************//************************************************************************************************************/下面是C代码实现部分,在C这边,首先在读时要对消息头的长度进行解析,在写时需要填充消息头。从fd0读,往fd1写 /* erl_comm.c */typedef unsigned char byte;read_cmd(byte *buf){ int len; if (read_exact(buf, 2) != 2) %%读取两个字节的消息头 return(-1); len = (buf[0] << 8) | buf[1]; %%进行大小编转换,因为erlang的端口消息在发送后会转换到网络字节序,而我们测试的环境是小编平台的话,就需要转换 return read_exact(buf, len);}write_cmd(byte *buf, int len){ byte li; li = (len >> 8) & 0xff; %%填充消息头,先填充高位,网络字节序 write_exact(&li, 1); li = len & 0xff; write_exact(&li, 1); return write_exact(buf, len);}read_exact(byte *buf, int len){ int i, got=0; do { if ((i = read(0, buf+got, len-got)) <= 0) return(i); got += i; } while (got0) { fn = buf[0]; arg = buf[1]; if (fn == 1) { res = foo(arg); } else if (fn == 2) { res = bar(arg); } buf[0] = res; write_cmd(buf, 1); }}
上面的代码在消息体上有一些限制,比如fn,arg,res这些变量限制在255大小以内。所以现实中,我们一般使用来对消息进行封装。使用Erl_interface封装消息,我们需要改变两处代码,第一Erl_interface处理外部Erlang项式,需要端口输出二进制流,因此在打开端口时,需要添加binary选项。
open_port({spawn, ExtPrg}, [{packet, 2}])改变为open_port({spawn, ExtPrg}, [{packet, 2}, binary])
第二我们不需要自己发明消息体的编码解码约定,直接使用/1,/1来进行Erlang任何项式到二进制流的转换与逆转换
Port ! {self(), {command, encode(Msg)}},receive {Port, {data, Data}} -> Caller ! {complex, decode(Data)}end改变为Port ! {self(), {command, term_to_binary(Msg)}},receive {Port, {data, Data}} -> Caller ! {complex, binary_to_term(Data)}end
在C端,我们需要使用Erl_interface来编解码。首先,从端口传入的Erlang项式结构流需要转换成ETERM struct,它在C端用来表示Erlang项式,最后由C函数计算的结果也必须首先转成ETERM,再传回端口。
/* ei.c */#include "erl_interface.h"#include "ei.h"typedef unsigned char byte;int main() { ETERM *tuplep, *intp; ETERM *fnp, *argp; int res; byte buf[100]; long allocated, freed; erl_init(NULL, 0); //初始化函数 while (read_cmd(buf) > 0) { //得到消息体的字节流 tuplep = erl_decode(buf); //erl_decode 解码得到ETERM fnp = erl_element(1, tuplep); argp = erl_element(2, tuplep); if (strncmp(ERL_ATOM_PTR(fnp), "foo", 3) == 0) { res = foo(ERL_INT_VALUE(argp)); } else if (strncmp(ERL_ATOM_PTR(fnp), "bar", 17) == 0) { res = bar(ERL_INT_VALUE(argp)); } intp = erl_mk_int(res); //将结果保存入项式intp erl_encode(intp, buf); write_cmd(buf, erl_term_len(intp)); erl_free_compound(tuplep); erl_free_term(fnp); erl_free_term(argp); erl_free_term(intp); }}
普通端口就聊到这里,后面几篇再聊下,以及C Node。