今天主题是实现并发服务器,实现方法有多种版本,先从简单的单进程代码实现到多进程,多线程的实现,最终引入一些高级模块来实现并发TCP服务器。
说到TCP,想起吐槽大会有个段子提到三次握手,也只有程序猿(媛)能get。
UDP服务器数据传输不可靠,这里就忽略了。
:
简单的单进程TCP服务器
假代码:
#创建tcp服务器套接字
#绑定端口
#设置正常情况退出的服务器下,端口可以重用
#设置监听,变为主动监听
# 等待客户端的链接,返回新的socket和地址
#关闭tcp服务器套接字
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR #创建tcp服务器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #绑定端口 server_socket.bind(("",9999)) #设置正常情况退出的服务器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #设置监听,变为主动监听 server_socket.listen(5) while True: # 等待客户端的链接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收数据,并且发送数据 try: while True: recv_data = new_socket.recv(1024) #当有客户端关闭后,recv解除阻塞,并且返回长度为0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address),recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客户端%s已经关闭" % (str(new_address))) break finally: new_socket.close() print("关闭%s客户端" % (str(new_address))) #关闭tcp服务器套接字 server_socket.close()
多进程TCP服务器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子进程中接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 当有客户端关闭后,recv解除阻塞,并且返回长度为0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客户端%s已经关闭" % (str(new_address))) break #关闭与客户端的连接 print("关闭与客户端的连接") new_socket.close() def main(): #创建tcp服务器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #绑定端口 server_socket.bind(("",8888)) #设置正常情况退出的服务器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #设置监听,变为被动连接 server_socket.listen(3) try: while True: # 等待客户端的链接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收数据,并且发送数据 Process(target=recv_data,args=(new_socket,new_address)).start() #因为主进程和子进程不共享数据 #如果我们直接关闭new_socket,只是关闭主进程的new_socket,而子进程的不受影响 new_socket.close() finally: #关闭tcp服务器套接字 server_socket.close() if __name__ == "__main__": main()
多进程TCP服务器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from multiprocessing import Process #在子进程中接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 当有客户端关闭后,recv解除阻塞,并且返回长度为0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客户端%s已经关闭" % (str(new_address))) break #关闭与客户端的连接 print("关闭与客户端的连接") new_socket.close() def main(): #创建tcp服务器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #绑定端口 server_socket.bind(("",8888)) #设置正常情况退出的服务器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #设置监听,变为被动连接 server_socket.listen(3) try: while True: # 等待客户端的链接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收数据,并且发送数据 Process(target=recv_data,args=(new_socket,new_address)).start() #因为主进程和子进程不共享数据 #如果我们直接关闭new_socket,只是关闭主进程的new_socket,而子进程的不受影响 new_socket.close() finally: #关闭tcp服务器套接字 server_socket.close() if __name__ == "__main__": main()
多线程TCP服务器
from socket import socket, AF_INET,SOCK_STREAM,SOL_SOCKET,SO_REUSEADDR from threading import Thread #接收消息 def recv_data(new_socket,new_address): while True: recv_data = new_socket.recv(1024) # 当有客户端关闭后,recv解除阻塞,并且返回长度为0 if len(recv_data) > 0: recv_content = recv_data.decode("gb2312") print("收到:%s的信息是:%s" % (str(new_address), recv_content)) new_socket.send("thank you!".encode("gb2312")) else: print("客户端%s已经关闭" % (str(new_address))) break def main(): #创建tcp服务器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #绑定端口 server_socket.bind(("",9999)) #设置正常情况退出的服务器下,端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #设置监听,变为被动连接 server_socket.listen(3) try: while True: # 等待客户端的链接,返回新的socket和地址 new_socket,new_address = server_socket.accept() #接收数据,并且发送数据 Thread(target=recv_data,args=(new_socket,new_address)).start() finally: #关闭tcp服务器套接字 server_socket.close() if __name__ == "__main__": main()
多任务协程实现 ——
greenlet和gevent
#coding=utf-8 from greenlet import greenlet import time def test1(): while True: print "---A--" gr2.switch() time.sleep(0.5) def test2(): while True: print "---B--" gr1.switch() time.sleep(0.5) gr1 = greenlet(test1) gr2 = greenlet(test2) #切换到gr1中运行 gr1.switch()
import gevent #函数 def f(n): for i in range(n): print("%s:%s" % (gevent.getcurrent(),i)) f1 = gevent.spawn(f,5) f2 = gevent.spawn(f,5) f3 = gevent.spawn(f,5) #让主线程等待三个协程执行完毕,否则没有机会执行 f1.join() f2.join() f3.join() #可以看到,3个greenlet是依次运行而不是交替运行。要让greenlet交替运行,可以通过gevent.sleep()交出控制权。
#coding=utf-8 import gevent def f(n): for i in range(n): print gevent.getcurrent(), i #用来模拟一个耗时操作,注意不是time模块中的sleep gevent.sleep(1) g1 = gevent.spawn(f, 5) g2 = gevent.spawn(f, 5) g3 = gevent.spawn(f, 5) #下面三行代码意思:主线程等待各个协成支持完,否则协成没有机会执行 g1.join() g2.join() g3.join()
单进程TCP服务器 ——
非堵塞式
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET def main(): #创建tcp的socket套接字 server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #绑定端口 server_socket.bind(("",9999)) #设置非阻塞,也就是说accept方法不阻塞了, # 但是在没有客户端链接且被执行的时候会报错 #有客户端链接的时候正常执行 server_socket.setblocking(False) #设置监听 server_socket.listen(5) #客户端列表 client_lists = [] try: #不断调用accept while True: try: # print("accept--111") new_socket,new_address = server_socket.accept() print("accept--2222") except Exception as result: # print(result) pass else: print("新的客户%s链接上" % str(new_address)) #新链接的new_sokect默认也是阻塞,也设置为非阻塞后,recv为非阻塞 new_socket.setblocking(False) client_lists.append((new_socket,new_address)) # print(111) for client_sokect,client_address in client_lists: #接收数据 try: recv_data = client_sokect.recv(1024) except Exception as result: # print(result) pass else: # print("正常数据:%s" %recv_data) if len(recv_data) > 0 : print("收到%s:%s" % (str(client_address),recv_data)) client_sokect.send("thank you!".encode("gb2312")) else: #客户端已经端口,要把该客户端从列表中异常 client_lists.remove((client_sokect,new_address)) client_sokect.close() print("%s已经断开" % str(new_address)) finally: #关闭套接字 server_socket.close() if __name__ == "__main__": main()
单进程TCP服务器 ——
select版
select 原理
其他语言(c或者c++)也有使用select实现多任务服务器。
select 能够完成一些套接字的检查,从头到尾检查一遍后,标记哪些套接字是否可以收数据,返回的时候,就返回能接收数据的套接字,返回的是列表。select是由操作系统提供的,效率要高些,非常快的方式检测哪些套接字可以接收数据。select是跨平台的,在window也可以用。
io多路复用:没有使用多进程和多线程的情况下完成多个套接字的使用。
from socket import AF_INET,socket,SO_REUSEADDR,SOCK_STREAM,SOL_SOCKET from select import select import sys def main(): #创建tcp的socket套接字 server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #绑定端口 server_socket.bind(("",9999)) #设置监听 server_socket.listen(5) #客户端列表 socket_lists = [server_socket,sys.stdin] wirte_list = [] #是否退出 is_run = False try: while True: #检测列表client_lists那些socket可以接收数据, #检测列表[]那些套接字(socket)可否发送数据 #检测列表[]那些套接字(socket)是否产生了异常 print("select--111") #这个select函数默认是堵塞,当有客户端链接的时候解除阻塞, # 当有数据可以接收的时候解除阻塞,当客户端断开的时候解除阻塞 readable, wirteable,excep = select(socket_lists,wirte_list,[]) # print("select--2222") # print(111) for sock in wirteable: #这个会一直发送,因为他是处于已经发的状态 sock.send("thank you!".encode("gb2312")) for sock in readable: #接收数据 if sock == server_socket: print("sock == server_socket") #有新的客户端链接进来 new_socket,new_address = sock.accept() #新的socket添加到列表中,便于下次socket的时候能检查到 socket_lists.append(new_socket) elif sock == sys.stdin: cmd = sys.stdin.readline() print(cmd) is_run = cmd else: # print("sock.recv(1024)....") #此时的套接字sock是直接可以取数据的 recv_data = sock.recv(1024) if len(recv_data) > 0: print("从[%s]:%s" % (str(new_address),recv_data)) sock.send(recv_data) #把链接上有消息接收的socket添加到监听写的列表中 wirte_list.append(sock) else: print("客户端已经断开") #客户端已经断开,要移除 sock.close() socket_lists.remove(sock) #是否退出程序 if is_run: break finally: #关闭套接字 server_socket.close() if __name__ == "__main__": main()
单进程TCP服务器 ——
epoll版
from socket import * import select def main(): #创建tcp服务器套接字 server_socket = socket(AF_INET,SOCK_STREAM) #设置端口可以重用 server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) #绑定端口 server_socket.bind(("",9999)) #设置监听 server_socket.listen(5) #用epoll设置监听收数据 epoll = select.epoll() #把server_socket注册到epoll的事件监听中,如果已经注册过会发生异常 epoll.register(server_socket.fileno(),select.EPOLLIN|select.EPOLLET) #装socket列表 socket_lists = {} #装socket对应的地址 socket_address = {} while True: #返回套接字列表[(socket的文件描述符,select.EPOLLIN)], # 如果有新的链接,有数据发过来,断开链接等都会解除阻塞 print("epoll.poll--111") epoll_list = epoll.poll() print("epoll.poll--222") print(epoll_list) for fd,event in epoll_list: #有新的链接 if fd == server_socket.fileno(): print("新的客户fd==%s" % fd) new_sokect,new_address = server_socket.accept() #往字典添加数据 socket_lists[new_sokect.fileno()] = new_sokect socket_address[new_sokect.fileno()] = new_address #注册新的socket也注册到epoll的事件监听中 epoll.register(new_sokect.fileno(), select.EPOLLIN | select.EPOLLET) elif event ==select.EPOLLIN: print("收到数据了") #根据文件操作符取出对应socket new_sokect = socket_lists[fd] address = socket_address[fd] recv_data = new_sokect.recv(1024) if len(recv_data) > 0: print("已经收到[%s]:%s" % (str(address),recv_data.decode("gb2312"))) else: #客户端端口,取消监听 epoll.unregister(fd) #关闭链接 new_sokect.close() print("[%s]已经下线" % str(address)) #关闭套接字链接 server_socket.close() if __name__ == "__main__": main()
单进程TCP服务器 ——
gevent版
gevent原理
greenlet已经实现了协程,但是这个还得人工切换,是不是觉得太麻烦了,莫要捉急,python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent
原理------当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO.
import sys import time import gevent from gevent import socket,monkey monkey.patch_all() def handle_request(conn): while True: data = conn.recv(1024) if not data: conn.close() break print("recv:", data) conn.send(data) def server(port): s = socket.socket() s.bind(('', port)) s.listen(5) while True: newSocket, addr = s.accept() gevent.spawn(handle_request, newSocket) if __name__ == '__main__': server(7788)
首先基于以上代码模块,撒点概念问题:
1.什么是协程?
协程:存在线程中,是比线程更小的执行单元,又称微线程,纤程。自带cpu上下文,操作协程由程序员决定,它可以将一个线程分解为多个微线程,每个协程间共享全局空间的变量,每秒钟切换频率高达百万次。
2. 什么是计算密集型和IO密集型
计算密集型:要进行大量的计算,消耗cpu资源。如复杂计算,对视频进行高清解码等,全靠cpu的运算能力。而计算密集型任务完成多任务切换任务比较耗时,cpu执行任务效率就越低。在python中,多进程适合计算密集型任务。
IO密集型:涉及到网络、磁盘io的任务都是io密集型。cpu消耗少,计算量小,如请求网页,读写文件等。在python中,使用sleep达到IO密集型任务的目的,多线程适合IO密集型任务。
各大实现版本对比:
select:
1)支持跨平台,最大缺陷是单个进程打开的FD是有限的,由FD_SETSIZE设置,默认是1024;
2)对socket扫描时是线性扫描,及采用轮询方式,效率低;
3)需要维护一个存放大量FD的数据结构,使得用户空间和内核空间在传递该数据结构时复制开销大。
poll:
1)poll与select本质上没有区别,但poll没有最大连接数的限制;
2)大量的fd数组被整体复制于用户态和内核地址空间之间,不管这样的复制是不是有意义;
3)‘水平触发',如果报告了fd后,没有被处理,下次poll时还会再次报告该fd。
epoll:
1)是之前poll和select的增强版,epoll更灵活,没有描述符限制,能打开的fd远大于1024(1G的内存上能监听约10万个端口);
2)‘边缘出发',事件通知机制,效率提升,最大的特点在于它只管你活跃的连接,而跟连接总数无关。而epoll对文件描述符的操作模式之一ET是一种高效的工作方式,很大程度减少事件反复触发的次数,内核不会发送更多的通知(only once)。
以上这篇基于并发服务器几种实现方法(总结)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
并发,服务器
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新动态
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]