借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.
这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.
zk_server.py
import threading import json import socket import sys from kazoo.client import KazooClient # TCP服务端绑定端口开启监听,同时将自己注册到zk class ZKServer(object): def __init__(self, host, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.host = host self.port = port self.sock.bind((host, port)) self.zk = None def serve(self): """ 开始服务,每次获取得到一个信息,都新建一个线程处理 """ self.sock.listen(128) self.register_zk() print("开始监听") while True: conn, addr = self.sock.accept() print("建立链接%s" % str(addr)) t = threading.Thread(target=self.handle, args=(conn, addr)) t.start() # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束 def handle(self, conn, addr): while True: data=conn.recv(1024) if not data or data.decode('utf-8') == 'exit': break print(data.decode('utf-8')) conn.close() print('My work is done!!!') # 将自己注册到zk,临时节点,所以连接不能中断 def register_zk(self): """ 注册到zookeeper """ self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() self.zk.ensure_path('/rpc') # 创建根节点 value = json.dumps({'host': self.host, 'port': self.port}) # 创建服务子节点 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True) if __name__ == '__main__': if len(sys.argv) < 3: print("usage:python server.py [host] [port]") exit(1) host = sys.argv[1] port = sys.argv[2] server = ZKServer(host, int(port)) server.serve()
zk_client.py
import random import sys import time import json import socket from kazoo.client import KazooClient # 客户端连接zk,并从zk获取可用的服务器列表 class ZKClient(object): def __init__(self): self._zk = KazooClient(hosts='127.0.0.1:2181') self._zk.start() self._get_servers() def _get_servers(self, event=None): """ 从zookeeper获取服务器地址信息列表 """ servers = self._zk.get_children('/rpc', watch=self._get_servers) # print(servers) self._servers = [] for server in servers: data = self._zk.get('/rpc/' + server)[0] if data: addr = json.loads(data.decode()) self._servers.append(addr) def _get_server(self): """ 随机选出一个可用的服务器 """ return random.choice(self._servers) def get_connection(self): """ 提供一个可用的tcp连接 """ sock = None while True: server = self._get_server() print('server:%s' % server) try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((server['host'], server['port'])) except ConnectionRefusedError: time.sleep(1) continue else: break return sock if __name__ == '__main__': # 模拟多个客户端批量生成任务,推送给服务器执行 client = ZKClient() for i in range(40): sock = client.get_connection() sock.send(bytes(str(i), encoding='utf8')) sock.close() time.sleep(1)
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件!
如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
暂无“Python通过zookeeper实现分布式服务代码解析”评论...
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新动态
2024年11月24日
2024年11月24日
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]