记录一个python爬虫脚本开发流程
明确需求
拿到如下数据,保存到excel文件中方便阅读 https://eth.bitaps.com/
2.观察规律
点击最下面的按钮能进到详情页,有规律的信息
找到获得内容的请求,复制curl
生成请求的requests内容
http://tool.yuanrenxue.com/curl
3.代码实现
示例代码01
# 安包命令
# pip install requests -i https://pypi.tuna.tsinghua.edu.cn/simple pip -U
# pip install pyquery -i https://pypi.tuna.tsinghua.edu.cn/simple pip -U
# pip install pandas -i https://pypi.tuna.tsinghua.edu.cn/simple pip -U
import random
import time
import requests
from pyquery import PyQuery as pq
import pandas as pd
# 请求30页,最多有70多页,根据需要写,如果需要全部写10000即可
pagenum = 30
# 输出文件的名字
file_csv = "res.csv"
value_list = []
# 获取信息
def get_info():
global key_lin
global key_list
global value_list
# 清空上次的内容
with open(file_csv, 'w', encoding='utf-8') as f:
f.write("")
for i in range(pagenum):
# doc = pq(requests.get("https://eth.bitaps.com/blocks?page={}".format(i), headers=headers).content.decode())
print("第 {} 页开始".format(i + 1))
headers = {
'authority': 'eth.bitaps.com',
'accept': '*/*',
'accept-language': 'zh-CN,zh;q=0.9',
'content-length': '0',
'cookie': 'hide_dashboard=0',
'origin': 'https://eth.bitaps.com',
'referer': 'https://eth.bitaps.com/blocks?page=23',
'sec-ch-ua': '^\\^',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '^\\^Windows^\\^',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.60 Safari/537.36',
'x-requested-with': 'XMLHttpRequest',
}
params = (('page', i + 1),)
response = (
requests.post('https://eth.bitaps.com/blocks/list', headers=headers, params=params)).content.decode()
# print(response)
# print("-" * 50)
doc = pq(response)
# print(doc)
# 遍历全部的table
for item in doc(".tml-table-wrap").items():
key_list = []
for tr in item("tr").items():
key = tr("td:nth-child(1)").text()
value = tr("td").text().strip(key).strip()
key_list.append(key)
value_list.append(value)
key_lin = len(key_list)
# 整理格式
print("key_list:{}".format(key_list))
key_list2 = key_list[:(key_lin + 1)]
# print("key_list2:{}".format(key_list2))
# print("value_list:{}".format(value_list))
res_all = []
for j in range(len(value_list) // key_lin):
value_list_1 = value_list[(j * key_lin):((j + 1) * key_lin)]
print("value_list_1:{}".format(value_list_1))
dataframe = pd.DataFrame({'key_list': key_list, 'value': value_list_1})
dataframe.to_csv(file_csv, mode='a', index=False, sep=',')
time.sleep(random.randint(1, 5)) # 请求太快不行
print("第 {} 页结束".format(i + 1))
def main():
get_info()
if __name__ == '__main__':
main()
得到结果如下
阅读方法
看清单直接看,要看某个key需要筛选下查看
示例代码02
用pyppeteer 获取,但是由于这个下一页按钮有bug,点击下一页会随机跑到一页,代码仅供参考
import asyncio
from pyppeteer import launch
from pyquery import PyQuery as pq
# 起始页
url = 'https://eth.bitaps.com/blocks?page=1'
file_json = "res03.json"
# 最大采集页数,注意 当前看到是总共有65页
pagenum = 3
# 起始页数
current_page = 1
async def get_info():
global pagenum
global current_page
browser = await launch(headless='', ignoreHTTPSErrors=True,
args=['--disable-infobars', '--window-size={1920},{1080}'])
page = await browser.newPage()
# 第一次执行,之后循环
await page.goto(url, {'timeout': 10000 * 20})
# 防止被识别
await page.evaluateOnNewDocument('Object.defineProperty(navigator,"webdriver", {get: () => undefined})')
while True:
if current_page > pagenum:
break
current_page += 1
# 开始采集
print("当前页面是第{}页".format(current_page))
await page.waitForSelector(".timeline")
doc = pq(await page.content())
kv_li_list = []
for li in doc(".timeline").items():
time = li(".time-date .bold").text()
# print("time:\n{}".format(time))
for item in doc(".tml-table-wrap").items():
value_list_tr = []
key_list_tr = []
for tr in item("tr").items():
key = tr("td:nth-child(1)").text()
value = tr("td").text().strip(key).strip()
key_list_tr.append(key)
value_list_tr.append(value)
kv_li_list += (list(zip(key_list_tr, value_list_tr)))
print("kv_li_list:\n{}".format(kv_li_list))
with open(file_json, mode='a+') as f:
# for kv_li_list in get_info():
f.write(str(kv_li_list))
# 点击下一页 注意 这个按钮本身功能有问题
await page.click('li:last-child', {'timeout': 10000 * 20})
await page.waitForSelector(".timeline")
if __name__ == '__main__':
# 执行之前先清理上次数据
with open(file_json, 'w', encoding='utf-8') as f:
f.write("")
# 起协程loop执行协程函数
asyncio.get_event_loop().run_until_complete(get_info())
end