概述
在进行网站爬取数据的时候,会发现很多网站都进行了反爬虫的处理,如JS加密,Ajax加密,反Debug等方法,通过请求获取数据和页面展示的内容完全不同,这时候就用到Selenium技术,来模拟浏览器的操作,然后获取数据。本文以一个简单的小例子,简述Python搭配Tkinter和Selenium进行浏览器的模拟操作,仅供学习分享使用,如有不足之处,还请指正。
什么是Selenium"color: #ff0000">安装Selenium
通过pip install selenium 进行安装即可,如果速度慢,则可以使用国内的镜像进行安装。
涉及知识点
程序虽小,除了需要掌握的Html ,JavaScript,CSS等基础知识外,本例涉及的Python相关知识点还是蛮多的,具体如下:
- Selenium相关:
Selenium进行元素定位,主要有ID,Name,ClassName,Css Selector,Partial LinkText,LinkText,XPath,TagName等8种方式。
Selenium获取单一元素(如:find_element_by_xpath)和获取元素数组(如:find_elements_by_xpath)两种方式。
Selenium元素定位后,可以给元素进行赋值和取值,或者进行相应的事件操作(如:click)。
- 线程(Thread)相关:
为了防止前台页面卡主,本文用到了线程进行后台操作,如果要定义一个新的线程,只需要定义一个类并继承threading.Thread,然后重写run方法即可。
在使用线程的过程中,为了保证线程的同步,本例用到了线程锁,如:threading.Lock()。
- 队列(queue)相关:
本例将Selenium执行的过程信息,保存到对列中,并通过线程输出到页面显示。queue默认先进先出方式。
对列通过put进行压栈,通过get进行出栈。通过qsize()用于获取当前对列元素个数。
- 日志(logging.Logger)相关:
为了保存Selenium执行过程中的日志,本例用到了日志模块,为Pyhton自带的模块,不需要额外安装。
Python的日志共六种级别,分别是:NOTSET,DEBUG,INFO,WARN,ERROR,FATAL,CRITICAL。
示例效果图
本例主要针对某一配置好的商品ID进行轮询,监控是否有货,有货则加入购物车,无货则继续轮询,如下图所示:
核心代码
本例最核心的代码,就是利用Selenium进行网站的模拟操作,如下所示:
class Smoking: """定义Smoking类""" # 浏览器驱动 __driver: webdriver = None # 配置帮助类 __cfg_info: dict = {} # 日志帮助类 __log_helper: LogHelper = None # 主程序目录 __work_path: str = '' # 是否正在运行 __running: bool = False # 无货 __no_stock = 'Currently Out of Stock' # 线程等待秒数 __wait_sec = 2 def __init__(self, work_path, cfg_info, log_helper: LogHelper): """初始化""" self.__cfg_info = cfg_info self.__log_helper = log_helper self.__work_path = work_path self.__wait_sec = int(cfg_info['wait_sec']) # 如果小于2,则等于2 self.__wait_sec = (2 if self.__wait_sec < 2 else self.__wait_sec) def checkIsExistsById(self, id): """通过ID判断是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_id(id)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_id(id)) > 0 except BaseException as e: return False def checkIsExistsByName(self, name): """通过名称判断是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_name(name)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_name(name)) > 0 except BaseException as e: return False def checkIsExistsByPath(self, path): """通过xpath判断是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_xpath(path)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_xpath(path)) > 0 except BaseException as e: return False def checkIsExistsByClass(self, cls): """通过class名称判断是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_class_name(cls)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_class_name(cls)) > 0 except BaseException as e: return False def checkIsExistsByLinkText(self, link_text): """判断LinkText是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_link_text(link_text)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_link_text(link_text)) > 0 except BaseException as e: return False def checkIsExistsByPartialLinkText(self, link_text): """判断包含LinkText是否存在""" try: i = 0 while self.__running and i < 3: if len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0: break else: time.sleep(self.__wait_sec) i = i + 1 return len(self.__driver.find_elements_by_partial_link_text(link_text)) > 0 except BaseException as e: return False # def waiting(self, *locator): # """等待完成""" # # self.__driver.switch_to.window(self.__driver.window_handles[1]) # Wait(self.__driver, 60).until(EC.visibility_of_element_located(locator)) def login(self, username, password): """登录""" # 5. 点击链接跳转到登录页面 self.__driver.find_element_by_link_text('账户登录').click() # 6. 输入账号密码 # 判断是否加载完成 # self.waiting((By.ID, "email")) if self.checkIsExistsById('email'): self.__driver.find_element_by_id('email').send_keys(username) self.__driver.find_element_by_id('password').send_keys(password) # 7. 点击登录按钮 self.__driver.find_element_by_id('sign-in').click() def working(self, item_id): """工作状态""" while self.__running: try: # 正常获取信息 if self.checkIsExistsById('string'): self.__driver.find_element_by_id('string').clear() self.__driver.find_element_by_id('string').send_keys(item_id) self.__driver.find_element_by_id('string').send_keys(Keys.ENTER) # 判断是否查询到商品 xpath = "//div[@class='specialty-header search']/div[@class='specialty-description']/div[" "@class='gt-450']/span[2] " if self.checkIsExistsByPath(xpath): count = int(self.__driver.find_element_by_xpath(xpath).text) if count < 1: time.sleep(self.__wait_sec) self.__log_helper.put('没有查询到item id =' + item_id + '对应的信息') continue else: time.sleep(self.__wait_sec) self.__log_helper.put('没有查询到item id2 =' + item_id + '对应的信息') continue # 判断当前库存是否有货 xpath1 = "//div[@class='product-list']/div[@class='product']/div[@class='price-and-detail']/div[" "@class='price']/span[@class='noStock'] " if self.checkIsExistsByPath(xpath1): txt = self.__driver.find_element_by_xpath(xpath1).text if txt == self.__no_stock: # 当前无货 time.sleep(self.__wait_sec) self.__log_helper.put('查询一次' + item_id + ',无货') continue # 链接path1 xpath2 = "//div[@class='product-list']/div[@class='product']/div[@class='imgDiv']/a" # 判断是否加载完毕 # self.waiting((By.CLASS_NAME, "imgDiv")) if self.checkIsExistsByPath(xpath2): self.__driver.find_element_by_xpath(xpath2).click() time.sleep(self.__wait_sec) # 加入购物车 if self.checkIsExistsByClass('add-to-cart'): self.__driver.find_element_by_class_name('add-to-cart').click() self.__log_helper.put('加入购物车成功,商品item-id:' + item_id) break else: self.__log_helper.put('未找到加入购物车按钮') else: self.__log_helper.put('没有查询到,可能是商品编码不对,或者已下架') except BaseException as e: self.__log_helper.put(e) def startRun(self): """运行起来""" try: self.__running = True url: str = self.__cfg_info['url'] username = self.__cfg_info['username'] password = self.__cfg_info['password'] item_id = self.__cfg_info['item_id'] if url is None or len(url) == 0 or username is None or len(username) == 0 or password is None or len( password) == 0 or item_id is None or len(item_id) == 0: self.__log_helper.put('配置信息不全,请检查config.cfg文件是否为空,然后再重启') return if self.__driver is None: options = webdriver.IeOptions() options.add_argument('encoding=UTF-8') options.add_argument('Accept= text / css, * / *') options.add_argument('Accept - Language= zh - Hans - CN, zh - Hans;q = 0.5') options.add_argument('Accept - Encoding= gzip, deflate') options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko') # 2. 定义浏览器驱动对象 self.__driver = webdriver.Ie(executable_path=self.__work_path + r'\IEDriverServer.exe', options=options) self.run(url, username, password, item_id) except BaseException as e: self.__log_helper.put('运行过程中出错,请重新打开再试') def run(self, url, username, password, item_id): """运行起来""" # 3. 访问网站 self.__driver.get(url) # 4. 最大化窗口 self.__driver.maximize_window() if self.checkIsExistsByLinkText('账户登录'): # 判断是否登录:未登录 self.login(username, password) if self.checkIsExistsByPartialLinkText('欢迎回来'): # 判断是否登录:已登录 self.__log_helper.put('登录成功,下一步开始工作了') self.working(item_id) else: self.__log_helper.put('登录失败,请设置账号密码') def stop(self): """停止""" try: self.__running = False # 如果驱动不为空,则关闭 self.close_browser_nicely(self.__driver) if self.__driver is not None: self.__driver.quit() # 关闭后切要为None,否则启动报错 self.__driver = None except BaseException as e: print('Stop Failure') finally: self.__driver = None def close_browser_nicely(self, browser): try: browser.execute_script("window.onunload=null; window.onbeforeunload=null") except Exception as err: print("Fail to execute_script:'window.onunload=null; window.onbeforeunload=null'") socket.setdefaulttimeout(10) try: browser.quit() print("Close browser and firefox by calling quit()") except Exception as err: print("Fail to quit from browser, error-type:%s, reason:%s" % (type(err), str(err))) socket.setdefaulttimeout(30)
其他辅助类
日志类(LogHelper),代码如下:
class LogHelper: """日志帮助类""" __queue: queue.Queue = None # 队列 __logging: logging.Logger = None # 日志 __running: bool = False # 是否记录日志 def __init__(self, log_path): """初始化类""" self.__queue = queue.Queue(1000) self.init_log(log_path) def put(self, value): """添加数据""" # 记录日志 self.__logging.info(value) # 添加到队列 if self.__queue.qsize() < self.__queue.maxsize: self.__queue.put(value) def get(self): """获取数据""" if self.__queue.qsize() > 0: try: return self.__queue.get(block=False) except BaseException as e: return None else: return None def init_log(self, log_path): """初始化日志""" self.__logging = logging.getLogger() self.__logging.setLevel(logging.INFO) # 日志 rq = time.strftime('%Y%m%d%H%M', time.localtime(time.time())) log_name = log_path + rq + '.log' logfile = log_name # if not os.path.exists(logfile): # # 创建空文件 # open(logfile, mode='r') fh = logging.FileHandler(logfile, mode='a', encoding='UTF-8') fh.setLevel(logging.DEBUG) # 输出到file的log等级的开关 # 第三步,定义handler的输出格式 formatter = logging.Formatter("%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s") fh.setFormatter(formatter) # 第四步,将logger添加到handler里面 self.__logging.addHandler(fh) def get_running(self): # 获取当前记录日志的状态 return self.__running def set_running(self, v: bool): # 设置当前记录日志的状态 self.__running = v
配置类(ConfigHelper)
class ConfigHelper: """初始化数据类""" __config_dir = None __dic_cfg = {} def __init__(self, config_dir): """初始化""" self.__config_dir = config_dir def ReadConfigInfo(self): """得到配置项""" parser = ConfigParser() parser.read(self.__config_dir + r"\config.cfg") section = parser.sections()[0] items = parser.items(section) self.__dic_cfg.clear() for item in items: self.__dic_cfg.__setitem__(item[0], item[1]) def getConfigInfo(self): """获取配置信息""" if len(self.__dic_cfg) == 0: self.ReadConfigInfo() return self.__dic_cfg
线程类(MyThread)
class MyThread(threading.Thread): """后台监控线程""" def __init__(self, tid, name, smoking: Smoking, log_helper: LogHelper): """线程初始化""" threading.Thread.__init__(self) self.threadID = tid self.name = name self.smoking = smoking self.log_helper = log_helper def run(self): print("开启线程: " + self.name) self.log_helper.put("开启线程: " + self.name) # 获取锁,用于线程同步 # lock = threading.Lock() # lock.acquire() self.smoking.startRun() # 释放锁,开启下一个线程 # lock.release() print("结束线程: " + self.name) self.log_helper.put("结束线程: " + self.name)
备注
侠客行 [唐:李白]赵客缦胡缨,吴钩霜雪明。银鞍照白马,飒沓如流星。
十步杀一人,千里不留行。事了拂衣去,深藏身与名。
闲过信陵饮,脱剑膝前横。将炙啖朱亥,持觞劝侯嬴。
三杯吐然诺,五岳倒为轻。眼花耳热后,意气素霓生。
救赵挥金槌,邯郸先震惊。千秋二壮士,烜赫大梁城。
纵死侠骨香,不惭世上英。谁能书阁下,白首太玄经。
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新动态
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]