怼服务器类代码的时候往往需要考虑线程池、工作队列这类事情,很容易就产生要么服务器性能发挥不出来,要么负载过高使得多线程变成瓶颈。再或者需要考虑同步/异步,或者阻塞/非阻塞等等等等,想起来都烦。

我们平时研究iocp或者epoll,是为了提升单一节点的并发量。但若换个角度来考虑的话,也许这个事本身就不会是个问题。

本文根据个人经验,试着提供一种工作模型来解决这类问题。

文中代码以delphi/pascal为主,如果写的过程中能找到我搞过的其他语言例子,也加进来。

 

首先了解一下消息队列:

至于介绍就懒得copy了,需要的话各位自己搜索去吧。各家产品各有不同,找一个适合自己的最重要。消息队列的协议有很多,先选择协议,再选择产品。

经验而谈,发布/订阅这类适合IM场景,对于节约流量和稳定性的考量方面,个人觉得mqtt协议最适合,也最稳定,还可以数据持久化。何谓持久化?简单的说就是能确保数据一定送达,而因为某些原因暂时不能送达的话,则暂存于服务器上。

对稳定性和巨量并发要求极高的场景或者需要集群环境的话,个人建议rabbitmq最好,但对硬件要求较高。次之是mqtt系列的,找个适合自己的即可,多数都开源免费。这玩意我曾测试过,cpu和网卡全都满载的时候依然能干活,唯一的一个。

如果对性能有极致要求,则zeromq是最好的,没有之一。我一直没测试出这玩意的上限……

 

本文的主旨在于找一个简单的途径来替代以往的线程池以及拼命怼iocp的思维,所以我选的是zeromq,这玩意算是个另类,严格的说其实它并不应该属于消息队列,而应该是对以往rpc和socket传输的重新抽象和封装。各种吹牛的话就不多说了,简单的说,这玩意可以很简单的实现集群、异构、进程内/进程间、各种通信模式,至于负载均衡什么的本身就包含于其内。他自称是实现了zero-copy,那么理论上他的效率已经到极限了。

 

假设一个场景:若干客户端需要连接到服务器(或群),客户端发送命令,服务器负责数据处理什么的。

那么在zeromq的模型中,可以选择客户端Req,服务器网关router,服务器工作池DEALER的模式。对客户端而言,将请求发往服务器的router;对router而言,本身并不负责具体处理,只将客户端的请求分发给dealer,由dealer分发给注册上来的rep,并将rep处理结果发给对应的客户端;对工作池而言,接收命令,进行处理,返回结果。在这个流程中,客户端和工作池互相不可见,客户端只知道router的存在,工作池只能看到dealer,而将处理结果返回给哪个客户端,则是由router自动处理,将工作发给哪个rep,则是由dealer自动决定(好像有一些平衡算法的设置什么的,细节记不清了,自己看官网吧)。

 

router+dealer的代码,delphi/pascal版:

context := TZMQContext.Create;
clients := Context.Socket(stRouter);
clients.bind('tcp://*:234');
workers := Context.Socket(stDealer);
workers.bind('inproc://workers');

ZMQProxy(clients, workers, nil);

 

python版:

url_worker = "inproc://workers"
url_client = "tcp://*:234"
context = zmq.Context.instance()
clients = context.socket(zmq.ROUTER)
clients.bind(url_client)
workers = context.socket(zmq.DEALER)
workers.bind(url_worker)

zmq.proxy(clients, workers)

 

 

工作池rep的代码,delphi版:

for i := 1 to dbthreadnum do
  BeginThread(nil, 0, @dbworker, context, 0, tid);


工作函数:
procedure dbworker(lcontext: TZMQContext);
var
  receiver: TZMQSocket;
begin
  receiver := lContext.Socket(stRep);
  receiver.connect('inproc://workers');
  while True do
  begin
   receiver.recv(s);

    工作代码放这里
    receiver.send(处理结果);
    sleep(1);
  end;
  receiver.Free;
end;

 

python版:

url_worker="inproc://workers"
for i in range(5):
    thread = threading.Thread(target=worker_routine, args=(url_worker,))
    thread.start()

def worker_routine(worker_url, context=None):
    context = context or zmq.Context.instance()
    socket = context.socket(zmq.REP)
    socket.connect(worker_url)
    while True:
        string  = socket.recv()
        工作代码放这里
        socket.send(处理结果)
        time.sleep(0.001)

 

 

客户端req代码,delphi版:

//如果需要处理时间很长,而且req是阻塞的,所以建议放在线程中
var
  context: TZMQContext;
  requester: TZMQSocket;
begin
  requester := Context.Socket(stReq);
  requester.connect('tcp://' + 服务器IP+ ':234');
        requester.send(sCmd); //发送命令
        requester.recv(sResult); //接收结果
  requester.Free;
  context.Free;
end;

 

上面这些代码因为是从我项目中复制出来,临时修改了一些端口、变量什么的,可能会有写错的地方,如果不能运行,可以参考官网给出的各种语言例子。

大概所有语言都有对应的库、绑定什么的,具体看官网咯:http://zero.mq

 

上面这个例子仅仅是解决c-s结构的一些问题,实际上zmq很好玩,而且能做的远远不止这些。比如将上面的代码全放在你的主进程中,则可以替换掉线程池的模式;如将代码分布于不同机器,则一个程序的代码和流程可以分载于多个机器上运行;再比如,聪明的你一定能想到,其实这玩意跟语言无关,如果req用delphi来做客户端,而rep用c/java什么的来做服务端,也是可以的哦;再比如,你的程序内嵌一点lua,而用lua来做rep,那也直接实现了不中断就可以更新程序功能;再比如,如果你的程序压力贼大,那么可以随着业务量的逐步提高来渐渐添加rep来扩展性能,不必过多考虑底层架构的事,也不必一开始就整个超大号的服务器。

zmq对内存基本没什么需求,印象中一个router+dealer的服务器好像也就1M或者几百k内存的样子,所以不会对系统资源有什么特殊的需求。

 

 

我能想到需要注意的就四点:

1,字符集问题,特别是异构的时候,强烈建议统一字符集,当然这一点其实跟zmq没关系。

2,因为zmq传输是明文,所以需要的时候自己怼个简单的移位或者加解密算法。

3,一个经验是,其实消息队列传输的都是字符串,所以当数据量大的时候,gzip一下会有很好的效果。

4,像php这类语言本身并不是为服务器和桌面环境准备的,所以跟线程什么的是不太好搀和在一起的,也不太好处于长时间持续运行的状态,所以虽然有折中的方法来用php做服务器(我试过,可行),但是强烈建议别这么玩。

 

 

作者 听涛

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注