纵有疾风起
人生不言弃

python模拟登陆之下载

好长时间没有更新博客了,哈哈。

今天公司给了这么一个需求,现在我们需要去淘宝获取上一天的订单号,然后再根据订单号去另一个接口去获取订单详情,然后再给我展示到web!

中间涉及到的技术点有:

  • 模拟登陆
  • 模拟下载
  • 解析exal文件数据流
  • 读取exal文件,拿出订单号
  • 还有最后一点请求接口

下面就给大家挨个说一下,刚拿到需求其实还是很模糊的,因为一个都没做过,等静下心来去理解的时候,发现并没有那么难,反而很简单

模拟登陆

一、分析页面请求头

本次登陆地址是https://huoche.alitrip.com/hello.htm

1、先登陆了一遍查看了一下请求头,发现就携带了三个东西,隐藏token,用户名,密码

python模拟登陆之下载插图

 

一看一目了然,就一个后台页面,可想而知相对来说还是很简单,哈哈,下一步我只需要封装一下cookie,然后带上tocken,username,passwd去登陆咯

给大家说下,python的requests模块可以忽略cookie,自己创建一个session对象,他自己去给咱们匹配cookie,不用去挨个试cookie,这样就节省了好多代码和时间

2、代码如下

class TbTomas(object):    def __init__(self):        # 配置初始化        self.session_obj = requests.session()    def download_file(self,thomas_username,thomas_password,):        hello_url = 'https://huoche.alitrip.com/hello.htm'        # 获取原文        hello_response = self.session_obj.get(hello_url)        # 正则匹配原文        h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text)                h_u_s = base64.b64encode(h_u_s)        headers = {            'Accept': 'text/html, application/xhtml+xml, image/jxr, */*',            'Referer': 'https://huoche.alitrip.com/hello.htm',            'Accept-Language': 'zh-CN',            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586',            'Content-Type': 'application/x-www-form-urlencoded',            'Accept-Encoding': 'gzip, deflate',            'Host': 'huoche.alitrip.com',            'Content-Length': '73',            'Connection': 'Keep-Alive',            'Cache-Control': 'no-cache'        }        post_data = {            'h_u_s': base64.b64encode(h_u_s),            'h_u_n': thomas_username,            'h_u_p': base64.b64encode(thomas_password)        }        index_url = 'https://huoche.alitrip.com/index.htm'        index_response = self.session_obj.post(index_url, headers=headers, data=post_data)

最后一提交post请求,就可以判断有没有登录成功了,是不是很简单,哈哈!

数据下载

下载也是和登录是一样的道理,下载的时候肯定也是像网页发一个post请求,然后就回去下载exal文件咯,python有这么一个模块xlrd,可以去操作exal文件,非常方便

1、原文是让我们输入时间看,下载那一天的数据,领导给的任务是下载前一天的,所以上一天时间要写几行代码来实现

python模拟登陆之下载插图1

代码如下:

today = datetime.datetime.now()yesterday = today + datetime.timedelta(days=-1)trade_date = yesterday.strftime('%Y-%m-%d')

2、查看下载文件请求的url,以及提交的数据,一张图一切都明白了

python模拟登陆之下载插图2

从图中可以看到,该文发送的url,请求方式,请求头,和返回的数据

3、模拟请求下载,只需用提交一下日期就OK搞定,文件下载完毕,接下开要读文件拿自己想要的东西啦

        post_data = {            'orderExportDate': trade_date        }        sheet_content = ""        for _ in xrange(3):            try:                # 得到exal文件流                download_response = self.session_obj.post(download_url, data=post_data)                # 打开exal文件                xls_content = xlrd.open_workbook(file_contents=download_response.content)                sheet_content = xls_content.sheets()[0]                break            except Exception as e:                continue

4、这个就众所周知,和读取文件一样,for循环一行一行读取,然后把订单号挨个添加给一个列表啥啦乱七八糟的

        order_item = []        for line_num in range(sheet_content.nrows):            line_item = sheet_content.row_values(line_num)            if line_item[2]:                order_item.append(line_item[2], )  # 订单号 order_no        # 获取到所有订单号        order_item = order_item[1:]

拿到订单号要去获取订单详情了,但是领导给我说这个已经有同事写好代码了,只需要调用那个接口就好,所以别人的代码我就不往上面展示了,原理很简单

requests模块,请求url,get传入订单号,发送请求,就可以返回数据咯,web页面展示,那个需求,每个公司都不一样,存入数据库,自己取自己想要的吧。

本文就到这里吧,学到一点东西的请点赞,哈哈

最后附带源码,用户名和密码就不告诉大家啦,啊哈哈

python模拟登陆之下载插图3

#!/usr/bin/python# coding:utf-8import sysimport osimport djangoreload(sys)sys.setdefaultencoding('utf8')sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))  # 把manage.py所在目录添加到系统目录os.environ['DJANGO_SETTINGS_MODULE'] = 'business.settings'  # 设置setting文件django.setup()  # 初始化Django环境import requestsimport reimport loggingimport base64import xlrdimport datetimeimport timeimport MySQLdbimport threadpoolfrom business import settingsfrom train.depends.platform import Platformfrom train.models import TbTomasOrder,TbTomasEpay,TtTicketThomas,TbTomasLinkmanfrom train import utilsfrom train.status import OrderStatusfrom django.core.mail import EmailMultiAlternativesfrom train.busi import insert_order,insert_ticket,insert_epay,insert_linkmanlogger = logging.getLogger('django')class TbTomas(object):    succ_number = 0    fail_number = 0    fail_order = []    def __init__(self,thread_num = 3):        # 配置初始化        self.session_obj = requests.session()        self.fail_order = []        self.succ_number = 0        self.fail_number = 0        self.thread_num = thread_num        self.start_date = ""        self.end_date = ""        self.trade_date = utils.now()    def login_thomas(self,thomas_username,thomas_password):        hello_url = 'https://huoche.alitrip.com/hello.htm'        hello_response = self.session_obj.get(hello_url)        h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text)        h_u_s = base64.b64encode(h_u_s)        headers = {            'Accept': 'text/html, application/xhtml+xml, image/jxr, */*',            'Referer': 'https://huoche.alitrip.com/hello.htm',            'Accept-Language': 'zh-CN',            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586',            'Content-Type': 'application/x-www-form-urlencoded',            'Accept-Encoding': 'gzip, deflate',            'Host': 'huoche.alitrip.com',            'Content-Length': '73',            'Connection': 'Keep-Alive',            'Cache-Control': 'no-cache'        }        post_data = {            'h_u_s': base64.b64encode(h_u_s),            'h_u_n': thomas_username,            'h_u_p': base64.b64encode(thomas_password)        }        index_url = 'https://huoche.alitrip.com/index.htm'        index_response = self.session_obj.post(index_url, headers=headers, data=post_data)        logger.info(u"登陆成功,等待下载文件...")    def download_file(self,thomas_username,thomas_password,args):        for _ in xrange(3):            try:                self.login_thomas(thomas_username,thomas_password)                break            except Exception as e:                logger.error(e)                continue        # 处理时间        all_time = self.date_time_handle(args)        if not all_time:            logger.error(u"日期格式错误!!")            return        for trade_date in all_time:            try:                self.trade_date = trade_date                post_data = {                    'orderExportDate': trade_date                }                download_url = 'https://huoche.alitrip.com/orderlistexp.do'                sheet_content = ""                for _ in xrange(3):                    try:                        # 得到exal文件流                        download_response = self.session_obj.post(download_url, data=post_data)                        # 打开exal文件                        xls_content = xlrd.open_workbook(file_contents=download_response.content)                        sheet_content = xls_content.sheets()[0]                        logger.info(u"下载文件成功,正在拿取订单号")                        break                    except Exception as e:                        logger.error(u"下载文件超时,正在等待重新登录后下载...")                        self.login_thomas(thomas_username, thomas_password)                        continue                order_item = []                if not  sheet_content:                    logger.error(u'下载文件失败,正在重新登录...')                    continue                for line_num in range(sheet_content.nrows):                    line_item = sheet_content.row_values(line_num)                    if line_item[2] and line_item[2] not in order_item:                        order_item.append(line_item[2], )  # 订单号 order_no                # 获取到所有订单号                order_item = order_item[1:]                # 根据订单号去拿订单详情                logger.info(u"正在写入数据库")                # 多线程去执行                pool = threadpool.ThreadPool(self.thread_num)                reqs = threadpool.makeRequests(self.create_order_info, order_item)                [pool.putRequest(req) for req in reqs]                pool.wait()                logger.info(u'写入完成,完成时间为:%s'% self.trade_date)                content = self.add_content(len(order_item), self.succ_number, self.fail_number, self.fail_order)                self.send_mail(content=content)                self.succ_number,self.fail_order = 0,0                self.fail_order = []                # self.create_order_info(order_item)            except Exception as e:                logger.error(e)    def date_time_handle(self,args):        all_time = []        if args:            if len(args) == 1:                self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()                self.end_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").date()            elif len(args) == 2:                self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()                self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date()            elif len(args) == 3:                self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()                self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date()                self.thread_num = int(args[2])            else:                logger.error(u"传入参数错误,请重新执行")                return            i = 0            while True:                tomoary = self.start_date + datetime.timedelta(days=i)                trade_date = tomoary.strftime('%Y-%m-%d')                all_time.append(trade_date)                i += 1                if tomoary == self.end_date:                    break        else:            today = datetime.datetime.now()            yesterday = today + datetime.timedelta(days=-1)            trade_date = yesterday.strftime('%Y-%m-%d')            all_time.append(trade_date)        return all_time    def create_order_info(self, order):        platform_obj = Platform()        order_info = platform_obj.get_order(order)        if not order_info:            self.fail_order.append(order)            self.fail_number += 1            logger.error('获取订单号:[%s]失败'%order)            return        try:            # 插入order表            if TbTomasOrder.objects.filter(order_no=order).exists():                logger.error('订单号:[%s]已经存在于TbTomasOrder'%order)                self.fail_order.append(order)                self.fail_number += 1                return            else:                insert_order(order_info,order,self.trade_date)                self.succ_number += 1            # 插入ticket表            insert_ticket(order_info,order,self.trade_date)            # 插入联系人            if TbTomasLinkman.objects.filter(order_no=order).exists():                logger.error('订单号:[%s]已经存在于TbTomasLinkman'%order)            else:                insert_linkman(order_info,order,self.trade_date)            # 插入epay表            if TbTomasEpay.objects.filter(order_no=order).exists():                logger.error('订单号:[%s]已经存在于TbTomasEpay'%order)            else:                insert_epay(order_info,order,self.trade_date)        except Exception as e:            logger.error(e)            self.fail_number +=1    def add_content(self,total,succ_number,fail_number,fail_order):        content = u'''        <h3>托马斯导入订单报表</h3>        <div class="col-xs-12">            <table border="1" cellpadding="3" cellspacing="1">                <tr>                    <td>日期</td>                    <td>总单数</td>                    <td>成功单数</td>                    <td>失败单数</td>                    <td>失败订单号</td>                </tr>                <tr>                    <td>%s</td>                    <td>%s</td>                    <td>%s</td>                    <td>%s</td>                    <td>%s</td>                </tr>            </table>        </div>        '''%(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),total,succ_number,fail_number,fail_order)        return content    def send_mail(self, content):        time_target = self.trade_date        subject = u'托马斯数据抓取邮件 %s' % (time_target)        logger.info(u'准备发送邮件....%s', subject)        mail_address = settings.mail_address_thomas        to_addr = []        if isinstance(mail_address, list):            to_addr += mail_address        elif isinstance(mail_address, str):            to_addr.append(mail_address)        logger.debug(to_addr)        from_email = settings.DEFAULT_FROM_EMAIL        msg = EmailMultiAlternatives(subject, 'result', from_email, to_addr)        msg.attach_alternative(content, "text/html")        flag = msg.send()        if flag:            logger.info(u'%s发送成功', subject)        else:            logger.error(u'%s发送失败', subject)        return    def run(self, username,passwd,args):        # 登陆托马斯后台        for _ in xrange(3):            try:                self.download_file(username,passwd,args)                break            except Exception as e:                logger.error(e)                continuedef re_search(regex, subject):    subject = str(subject)    obj = re.compile(regex)    match = obj.search(subject)    if match:        result = match.group(1)    else:        result = ''    return resultdef main():    username = base64.b64decode(settings.THOMAS_USERNAME)    passwd = base64.b64decode(settings.THOMAS_PASSWORD)    args = sys.argv[1:] if sys.argv[1:] else ""    TbTomas().run(username,passwd,args)if __name__ == "__main__":    main()

thread_code

 

文章转载于:https://www.cnblogs.com/aylin/p/6114818.html

原著是一个有趣的人,若有侵权,请通知删除

未经允许不得转载:起风网 » python模拟登陆之下载
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录