270 lines
11 KiB
Python
270 lines
11 KiB
Python
# encoding: utf-8
|
||
# version: 0.0.1
|
||
# author: qianyi
|
||
# email: xxqianyi@163.com
|
||
# date: 2024-10-13
|
||
# description: 爬取番茄小说
|
||
import requests
|
||
import parsel
|
||
import random
|
||
import time
|
||
import json
|
||
from pathlib import Path
|
||
from fake_useragent import UserAgent
|
||
class NovelSpider:
|
||
def __init__(self):
|
||
self.URL = 'https://fanqienovel.com'
|
||
self.cookie = ''
|
||
self.headers = {
|
||
'User-Agent': UserAgent().random,
|
||
'cookie': self.cookie
|
||
}
|
||
self.novel_dict = {}
|
||
|
||
@staticmethod
|
||
def decrypt_chapter_content(content):
|
||
with open('woff2.json', 'r', encoding='utf-8') as f:
|
||
woff2_dict = json.load(f)
|
||
converted_content = ""
|
||
for index in content:
|
||
try:
|
||
converted_content += woff2_dict[str(ord(index))]
|
||
except:
|
||
converted_content += index
|
||
return converted_content
|
||
|
||
def search_novel(self, key):
|
||
"""
|
||
搜索小说
|
||
:param key: 小说关键字
|
||
:return: 小说ID
|
||
"""
|
||
while True:
|
||
if key == '':
|
||
return 'b'
|
||
# 使用新的API进行搜索
|
||
url = f"https://api5-normal-lf.fqnovel.com/reading/bookapi/search/page/v/?query={key}&aid=1967&channel=0&os_version=0&device_type=0&device_platform=0&iid=466614321180296&passback={{(page-1)*10}}&version_code=999"
|
||
response = requests.get(url)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data['code'] == 0:
|
||
books = data['data']
|
||
if not books:
|
||
print("没有找到相关书籍。")
|
||
break
|
||
for index, book in enumerate(books):
|
||
print(
|
||
f"{index + 1}. 名称:{book['book_data'][0]['book_name']} 作者:{book['book_data'][0]['author']} ID:{book['book_data'][0]['book_id']} 字数:{book['book_data'][0]['word_number']}")
|
||
while True:
|
||
choice_ = input("请选择一个结果, 输入 r 以重新搜索:")
|
||
if choice_ == "r":
|
||
break
|
||
elif choice_.isdigit() and 1 <= int(choice_) <= len(books):
|
||
chosen_book = books[int(choice_) - 1]
|
||
self.novel_dict['book_id'] = chosen_book['book_data'][0]['book_id']
|
||
return chosen_book['book_data'][0]['book_id']
|
||
else:
|
||
print("输入无效,请重新输入。")
|
||
else:
|
||
print("搜索出错,错误码:", data['code'])
|
||
break
|
||
else:
|
||
print("请求失败,状态码:", response.status_code)
|
||
break
|
||
|
||
def get_novel_info(self, book_id):
|
||
"""
|
||
获取小说信息
|
||
:param book_id: 小说ID
|
||
:return: 小说信息字典
|
||
"""
|
||
# 构建小说信息字典
|
||
url = f"{self.URL}/page/{book_id}"
|
||
response = requests.get(url, headers=self.headers)
|
||
novel_selector = parsel.Selector(response.content.decode('utf-8'))
|
||
self.novel_dict['title'] = novel_selector.css('.info-name h1::text').get()
|
||
self.novel_dict['author'] = novel_selector.css('.author-name-text::text').get()
|
||
self.novel_dict['word_count'] = novel_selector.css('.detail::text').getall()
|
||
self.novel_dict['intro'] = novel_selector.css('.page-abstract-content p::text').get()
|
||
self.novel_dict['chapter_latest'] = ' '.join(title for title in novel_selector.css('.info-last-title:nth-child(1)::text').getall() if title != '最近更新:')
|
||
self.novel_dict['chapter_names'] = novel_selector.css('.chapter-item .chapter-item-title::text').getall()
|
||
self.novel_dict['chapter_links'] = novel_selector.css('.chapter-item .chapter-item-title::attr(href)').getall()
|
||
# return self.novel_dict
|
||
|
||
def get_novel_content(self, chapter_link):
|
||
"""
|
||
获取小说章节内容
|
||
:param chapter_link: 小说章节链接
|
||
:return: 小说章节内容
|
||
"""
|
||
url = f"{self.URL}{chapter_link}"
|
||
self.headers['cookie'] = self.cookie
|
||
try:
|
||
response = requests.get(url, headers=self.headers)
|
||
chapter_selector = parsel.Selector(response.content.decode('utf-8'))
|
||
chapter_name = chapter_selector.css('.muye-reader-title::text').get()
|
||
chapter_contents = chapter_selector.css('.muye-reader-content-16 p::text').getall()
|
||
# 解密
|
||
chapter_content = self.decrypt_chapter_content('\n\n'.join(chapter_contents))
|
||
print(f"{chapter_name}章节内容长度:{len(chapter_content)}")
|
||
return chapter_content
|
||
except :
|
||
print("获取章节内容失败。")
|
||
|
||
def get_cookie(self, t):
|
||
"""
|
||
获取cookie
|
||
:param t: 0表示获取cookie
|
||
:return:
|
||
"""
|
||
bas = 1000000000000000000 # 定义基数
|
||
if t == '': # 如果未传入cookie
|
||
# 随机生成一个数字作为cookie
|
||
for num in range(random.randint(bas * 6, bas * 8), bas * 9):
|
||
time.sleep(random.randint(50, 150) / 1000) # 随机暂停
|
||
self.cookie = 'novel_web_id=' + str(num) # 生成cookie
|
||
self.headers['cookie'] = self.cookie
|
||
link = self.random_str('chapter_links')
|
||
print(f"尝试使用cookie {self.cookie} 获取 {link} 链接")
|
||
# 200ms间隔,防止被封IP
|
||
time.sleep(0.2)
|
||
if len(self.get_novel_content(link)) > 200:
|
||
return 'ok'
|
||
else: # 如果传入了cookie
|
||
self.headers['cookie'] = t
|
||
link = self.random_str('chapter_links')
|
||
if len(self.get_novel_content(link)) > 200:
|
||
print(f"尝试使用cookie {self.cookie} 获取 {link} 链接")
|
||
return 'ok'
|
||
else:
|
||
return 'err'
|
||
|
||
def random_str(self, key):
|
||
"""
|
||
随机选取列表中的一个元素
|
||
:param key: 字典键
|
||
:return: 随机元素
|
||
"""
|
||
try:
|
||
values = self.novel_dict[key]
|
||
if values:
|
||
return random.choice(values[10:])
|
||
except KeyError:
|
||
print("返回值为空,请先获取小说信息。")
|
||
|
||
def down_text(self):
|
||
file = FileHandler()
|
||
for name,link in zip(self.novel_dict['chapter_names'], self.novel_dict['chapter_links']):
|
||
time.sleep(1)
|
||
text = self.get_novel_content(link)
|
||
file.book2down('text', self.novel_dict['title'], name, text)
|
||
|
||
class FileHandler:
|
||
def __init__(self):
|
||
self.config = {}
|
||
self.path = Path('Data')
|
||
self.config_file_path = self.path / 'fq2s.cookie'
|
||
self.download_dir_path = self.path / 'books'
|
||
|
||
# 如果 Data 目录不存在,则创建
|
||
if not self.path.exists():
|
||
self.path.mkdir(parents=True)
|
||
self.download_dir_path.mkdir(parents=True)
|
||
print(f"目录 {self.path} , {self.download_dir_path} 已创建。")
|
||
|
||
def read_config(self,):
|
||
"""
|
||
读取配置文件
|
||
:param : 配置文件路径
|
||
:return: 配置字典
|
||
"""
|
||
# 检查 fq2s.conf 文件是否存在
|
||
if self.config_file_path.exists():
|
||
# 如果文件存在,则读取内容
|
||
self.config['cookie'] = self.config_file_path.read_text(encoding='utf-8')
|
||
# print(f"配置文件 {self.config_file_path} 读取成功。")
|
||
else:
|
||
# 如果文件不存在,则创建文件并写入默认内容
|
||
default_content = ""
|
||
self.config_file_path.write_text(default_content, encoding='utf-8')
|
||
print(f"配置文件 {self.config_file_path} 不存在,已创建并写入默认内容。")
|
||
|
||
def write_config(self, con):
|
||
"""
|
||
写入配置文件
|
||
:param con: 配置内容
|
||
:return:
|
||
"""
|
||
cons = f'"{con}"'
|
||
with self.config_file_path.open('w', encoding='utf-8') as f:
|
||
f.write(cons)
|
||
print("配置文件写入成功。")
|
||
|
||
def book2down(self, down_type, b_name, c_name, text):
|
||
"""
|
||
下载小说
|
||
:param down_type: 下载类型
|
||
:param b_name: 小说名
|
||
:param c_name: 章节名
|
||
:param text: 章节内容
|
||
:return:
|
||
"""
|
||
match down_type:
|
||
case 'text':
|
||
# 下载txt格式
|
||
book_name_path = self.download_dir_path / f"{b_name}"
|
||
book_name_path.mkdir(parents=True, exist_ok=True)
|
||
file_path = book_name_path / f"{c_name}.txt"
|
||
file_path.write_text(text, encoding='utf-8')
|
||
print(f"已下载 {c_name} 章节。")
|
||
case 2:
|
||
# 下载json格式
|
||
pass
|
||
def run(mode=0):
|
||
print("欢迎使用番茄小说爬虫!\n初始化中................................................")
|
||
novel_spider = NovelSpider()
|
||
file_handler = FileHandler()
|
||
novel_spider.get_novel_info(6982529841564224526)
|
||
file_handler.read_config()
|
||
cookie = file_handler.config.get('cookie', '')
|
||
novel_spider.cookie = cookie
|
||
if cookie == '':
|
||
print("获取cookie------------")
|
||
status = novel_spider.get_cookie('')
|
||
if status == 'ok':
|
||
print("获取cookie成功!")
|
||
file_handler.write_config(novel_spider.cookie)
|
||
else:
|
||
status = novel_spider.get_cookie(cookie)
|
||
if status == 'err':
|
||
print("该cookie已失效,重新获取。")
|
||
novel_spider.get_cookie('')
|
||
file_handler.write_config(novel_spider.cookie)
|
||
print("获取cookie成功!")
|
||
elif status == 'ok':
|
||
print("cookie可用!")
|
||
if mode == 1:
|
||
pass
|
||
else:
|
||
# nogui模式
|
||
while True:
|
||
m = input("请输入功能模块[1.下载小说|2.x|3.x|4.x|5.x|6.x|7.x|8.x|9.x||0.退出]:")
|
||
match m:
|
||
case '1':
|
||
# 下载小说
|
||
# t = input("请输入下载类型[1.txt|2.json]")
|
||
bid = novel_spider.search_novel(input("请输入要搜索的小说名:"))
|
||
novel_spider.get_novel_info(bid)
|
||
novel_spider.down_text()
|
||
case '2':
|
||
pass
|
||
case '0':
|
||
break
|
||
# 主程序入口
|
||
if __name__ == '__main__':
|
||
'''
|
||
打开方式:nogui(0)|gui(1)
|
||
'''
|
||
# i = int(input("请输入打开方式[nogui(0)|gui(1)]:"))
|
||
i = 0
|
||
run(i) |