怼服务器类代码的时候往往需要考虑线程池、工作队列这类事情,很容易就产生要么服务器性能发挥不出来,要么负载过高使得多线程变成瓶颈。再或者需要考虑同步/异步,或者阻塞/非阻塞等等等等,想起来都烦。
我们平时研究iocp或者epoll,是为了提升单一节点的并发量。但若换个角度来考虑的话,也许这个事本身就不会是个问题。
本文根据个人经验,试着提供一种工作模型来解决这类问题。
文中代码以delphi/pascal为主,如果写的过程中能找到我搞过的其他语言例子,也加进来。
首先了解一下消息队列:
至于介绍就懒得copy了,需要的话各位自己搜索去吧。各家产品各有不同,找一个适合自己的最重要。消息队列的协议有很多,先选择协议,再选择产品。
经验而谈,发布/订阅这类适合IM场景,对于节约流量和稳定性的考量方面,个人觉得mqtt协议最适合,也最稳定,还可以数据持久化。何谓持久化?简单的说就是能确保数据一定送达,而因为某些原因暂时不能送达的话,则暂存于服务器上。
对稳定性和巨量并发要求极高的场景或者需要集群环境的话,个人建议rabbitmq最好,但对硬件要求较高。次之是mqtt系列的,找个适合自己的即可,多数都开源免费。这玩意我曾测试过,cpu和网卡全都满载的时候依然能干活,唯一的一个。
如果对性能有极致要求,则zeromq是最好的,没有之一。我一直没测试出这玩意的上限……
本文的主旨在于找一个简单的途径来替代以往的线程池以及拼命怼iocp的思维,所以我选的是zeromq,这玩意算是个另类,严格的说其实它并不应该属于消息队列,而应该是对以往rpc和socket传输的重新抽象和封装。各种吹牛的话就不多说了,简单的说,这玩意可以很简单的实现集群、异构、进程内/进程间、各种通信模式,至于负载均衡什么的本身就包含于其内。他自称是实现了zero-copy,那么理论上他的效率已经到极限了。
假设一个场景:若干客户端需要连接到服务器(或群),客户端发送命令,服务器负责数据处理什么的。
那么在zeromq的模型中,可以选择客户端Req,服务器网关router,服务器工作池DEALER的模式。对客户端而言,将请求发往服务器的router;对router而言,本身并不负责具体处理,只将客户端的请求分发给dealer,由dealer分发给注册上来的rep,并将rep处理结果发给对应的客户端;对工作池而言,接收命令,进行处理,返回结果。在这个流程中,客户端和工作池互相不可见,客户端只知道router的存在,工作池只能看到dealer,而将处理结果返回给哪个客户端,则是由router自动处理,将工作发给哪个rep,则是由dealer自动决定(好像有一些平衡算法的设置什么的,细节记不清了,自己看官网吧)。
router+dealer的代码,delphi/pascal版:
context := TZMQContext.Create; clients := Context.Socket(stRouter); clients.bind('tcp://*:234'); workers := Context.Socket(stDealer); workers.bind('inproc://workers'); ZMQProxy(clients, workers, nil);
python版:
url_worker = "inproc://workers" url_client = "tcp://*:234" context = zmq.Context.instance() clients = context.socket(zmq.ROUTER) clients.bind(url_client) workers = context.socket(zmq.DEALER) workers.bind(url_worker) zmq.proxy(clients, workers)
工作池rep的代码,delphi版:
for i := 1 to dbthreadnum do BeginThread(nil, 0, @dbworker, context, 0, tid); 工作函数: procedure dbworker(lcontext: TZMQContext); var receiver: TZMQSocket; begin receiver := lContext.Socket(stRep); receiver.connect('inproc://workers'); while True do begin receiver.recv(s); 工作代码放这里 receiver.send(处理结果); sleep(1); end; receiver.Free; end;
python版:
url_worker="inproc://workers" for i in range(5): thread = threading.Thread(target=worker_routine, args=(url_worker,)) thread.start() def worker_routine(worker_url, context=None): context = context or zmq.Context.instance() socket = context.socket(zmq.REP) socket.connect(worker_url) while True: string = socket.recv() 工作代码放这里 socket.send(处理结果) time.sleep(0.001)
客户端req代码,delphi版:
//如果需要处理时间很长,而且req是阻塞的,所以建议放在线程中 var context: TZMQContext; requester: TZMQSocket; begin requester := Context.Socket(stReq); requester.connect('tcp://' + 服务器IP+ ':234'); requester.send(sCmd); //发送命令 requester.recv(sResult); //接收结果 requester.Free; context.Free; end;
上面这些代码因为是从我项目中复制出来,临时修改了一些端口、变量什么的,可能会有写错的地方,如果不能运行,可以参考官网给出的各种语言例子。
大概所有语言都有对应的库、绑定什么的,具体看官网咯:http://zero.mq
上面这个例子仅仅是解决c-s结构的一些问题,实际上zmq很好玩,而且能做的远远不止这些。比如将上面的代码全放在你的主进程中,则可以替换掉线程池的模式;如将代码分布于不同机器,则一个程序的代码和流程可以分载于多个机器上运行;再比如,聪明的你一定能想到,其实这玩意跟语言无关,如果req用delphi来做客户端,而rep用c/java什么的来做服务端,也是可以的哦;再比如,你的程序内嵌一点lua,而用lua来做rep,那也直接实现了不中断就可以更新程序功能;再比如,如果你的程序压力贼大,那么可以随着业务量的逐步提高来渐渐添加rep来扩展性能,不必过多考虑底层架构的事,也不必一开始就整个超大号的服务器。
zmq对内存基本没什么需求,印象中一个router+dealer的服务器好像也就1M或者几百k内存的样子,所以不会对系统资源有什么特殊的需求。
我能想到需要注意的就四点:
1,字符集问题,特别是异构的时候,强烈建议统一字符集,当然这一点其实跟zmq没关系。
2,因为zmq传输是明文,所以需要的时候自己怼个简单的移位或者加解密算法。
3,一个经验是,其实消息队列传输的都是字符串,所以当数据量大的时候,gzip一下会有很好的效果。
4,像php这类语言本身并不是为服务器和桌面环境准备的,所以跟线程什么的是不太好搀和在一起的,也不太好处于长时间持续运行的状态,所以虽然有折中的方法来用php做服务器(我试过,可行),但是强烈建议别这么玩。