FQNovel2Spider/main.py

270 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# encoding: utf-8
# version: 0.0.1
# author: qianyi
# email: xxqianyi@163.com
# date: 2024-10-13
# description: 爬取番茄小说
import requests
import parsel
import random
import time
import json
from pathlib import Path
from fake_useragent import UserAgent
class NovelSpider:
def __init__(self):
self.URL = 'https://fanqienovel.com'
self.cookie = ''
self.headers = {
'User-Agent': UserAgent().random,
'cookie': self.cookie
}
self.novel_dict = {}
@staticmethod
def decrypt_chapter_content(content):
with open('woff2.json', 'r', encoding='utf-8') as f:
woff2_dict = json.load(f)
converted_content = ""
for index in content:
try:
converted_content += woff2_dict[str(ord(index))]
except:
converted_content += index
return converted_content
def search_novel(self, key):
"""
搜索小说
:param key: 小说关键字
:return: 小说ID
"""
while True:
if key == '':
return 'b'
# 使用新的API进行搜索
url = f"https://api5-normal-lf.fqnovel.com/reading/bookapi/search/page/v/?query={key}&aid=1967&channel=0&os_version=0&device_type=0&device_platform=0&iid=466614321180296&passback={{(page-1)*10}}&version_code=999"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
if data['code'] == 0:
books = data['data']
if not books:
print("没有找到相关书籍。")
break
for index, book in enumerate(books):
print(
f"{index + 1}. 名称:{book['book_data'][0]['book_name']} 作者:{book['book_data'][0]['author']} ID{book['book_data'][0]['book_id']} 字数:{book['book_data'][0]['word_number']}")
while True:
choice_ = input("请选择一个结果, 输入 r 以重新搜索:")
if choice_ == "r":
break
elif choice_.isdigit() and 1 <= int(choice_) <= len(books):
chosen_book = books[int(choice_) - 1]
self.novel_dict['book_id'] = chosen_book['book_data'][0]['book_id']
return chosen_book['book_data'][0]['book_id']
else:
print("输入无效,请重新输入。")
else:
print("搜索出错,错误码:", data['code'])
break
else:
print("请求失败,状态码:", response.status_code)
break
def get_novel_info(self, book_id):
"""
获取小说信息
:param book_id: 小说ID
:return: 小说信息字典
"""
# 构建小说信息字典
url = f"{self.URL}/page/{book_id}"
response = requests.get(url, headers=self.headers)
novel_selector = parsel.Selector(response.content.decode('utf-8'))
self.novel_dict['title'] = novel_selector.css('.info-name h1::text').get()
self.novel_dict['author'] = novel_selector.css('.author-name-text::text').get()
self.novel_dict['word_count'] = novel_selector.css('.detail::text').getall()
self.novel_dict['intro'] = novel_selector.css('.page-abstract-content p::text').get()
self.novel_dict['chapter_latest'] = ' '.join(title for title in novel_selector.css('.info-last-title:nth-child(1)::text').getall() if title != '最近更新:')
self.novel_dict['chapter_names'] = novel_selector.css('.chapter-item .chapter-item-title::text').getall()
self.novel_dict['chapter_links'] = novel_selector.css('.chapter-item .chapter-item-title::attr(href)').getall()
# return self.novel_dict
def get_novel_content(self, chapter_link):
"""
获取小说章节内容
:param chapter_link: 小说章节链接
:return: 小说章节内容
"""
url = f"{self.URL}{chapter_link}"
self.headers['cookie'] = self.cookie
try:
response = requests.get(url, headers=self.headers)
chapter_selector = parsel.Selector(response.content.decode('utf-8'))
chapter_name = chapter_selector.css('.muye-reader-title::text').get()
chapter_contents = chapter_selector.css('.muye-reader-content-16 p::text').getall()
# 解密
chapter_content = self.decrypt_chapter_content('\n\n'.join(chapter_contents))
print(f"{chapter_name}章节内容长度:{len(chapter_content)}")
return chapter_content
except :
print("获取章节内容失败。")
def get_cookie(self, t):
"""
获取cookie
:param t: 0表示获取cookie
:return:
"""
bas = 1000000000000000000 # 定义基数
if t == '': # 如果未传入cookie
# 随机生成一个数字作为cookie
for num in range(random.randint(bas * 6, bas * 8), bas * 9):
time.sleep(random.randint(50, 150) / 1000) # 随机暂停
self.cookie = 'novel_web_id=' + str(num) # 生成cookie
self.headers['cookie'] = self.cookie
link = self.random_str('chapter_links')
print(f"尝试使用cookie {self.cookie} 获取 {link} 链接")
# 200ms间隔防止被封IP
time.sleep(0.2)
if len(self.get_novel_content(link)) > 200:
return 'ok'
else: # 如果传入了cookie
self.headers['cookie'] = t
link = self.random_str('chapter_links')
if len(self.get_novel_content(link)) > 200:
print(f"尝试使用cookie {self.cookie} 获取 {link} 链接")
return 'ok'
else:
return 'err'
def random_str(self, key):
"""
随机选取列表中的一个元素
:param key: 字典键
:return: 随机元素
"""
try:
values = self.novel_dict[key]
if values:
return random.choice(values[10:])
except KeyError:
print("返回值为空,请先获取小说信息。")
def down_text(self):
file = FileHandler()
for name,link in zip(self.novel_dict['chapter_names'], self.novel_dict['chapter_links']):
time.sleep(1)
text = self.get_novel_content(link)
file.book2down('text', self.novel_dict['title'], name, text)
class FileHandler:
def __init__(self):
self.config = {}
self.path = Path('Data')
self.config_file_path = self.path / 'fq2s.cookie'
self.download_dir_path = self.path / 'books'
# 如果 Data 目录不存在,则创建
if not self.path.exists():
self.path.mkdir(parents=True)
self.download_dir_path.mkdir(parents=True)
print(f"目录 {self.path} , {self.download_dir_path} 已创建。")
def read_config(self,):
"""
读取配置文件
:param : 配置文件路径
:return: 配置字典
"""
# 检查 fq2s.conf 文件是否存在
if self.config_file_path.exists():
# 如果文件存在,则读取内容
self.config['cookie'] = self.config_file_path.read_text(encoding='utf-8')
# print(f"配置文件 {self.config_file_path} 读取成功。")
else:
# 如果文件不存在,则创建文件并写入默认内容
default_content = ""
self.config_file_path.write_text(default_content, encoding='utf-8')
print(f"配置文件 {self.config_file_path} 不存在,已创建并写入默认内容。")
def write_config(self, con):
"""
写入配置文件
:param con: 配置内容
:return:
"""
cons = f'"{con}"'
with self.config_file_path.open('w', encoding='utf-8') as f:
f.write(cons)
print("配置文件写入成功。")
def book2down(self, down_type, b_name, c_name, text):
"""
下载小说
:param down_type: 下载类型
:param b_name: 小说名
:param c_name: 章节名
:param text: 章节内容
:return:
"""
match down_type:
case 'text':
# 下载txt格式
book_name_path = self.download_dir_path / f"{b_name}"
book_name_path.mkdir(parents=True, exist_ok=True)
file_path = book_name_path / f"{c_name}.txt"
file_path.write_text(text, encoding='utf-8')
print(f"已下载 {c_name} 章节。")
case 2:
# 下载json格式
pass
def run(mode=0):
print("欢迎使用番茄小说爬虫!\n初始化中................................................")
novel_spider = NovelSpider()
file_handler = FileHandler()
novel_spider.get_novel_info(6982529841564224526)
file_handler.read_config()
cookie = file_handler.config.get('cookie', '')
novel_spider.cookie = cookie
if cookie == '':
print("获取cookie------------")
status = novel_spider.get_cookie('')
if status == 'ok':
print("获取cookie成功")
file_handler.write_config(novel_spider.cookie)
else:
status = novel_spider.get_cookie(cookie)
if status == 'err':
print("该cookie已失效重新获取。")
novel_spider.get_cookie('')
file_handler.write_config(novel_spider.cookie)
print("获取cookie成功")
elif status == 'ok':
print("cookie可用")
if mode == 1:
pass
else:
# nogui模式
while True:
m = input("请输入功能模块[1.下载小说|2.x|3.x|4.x|5.x|6.x|7.x|8.x|9.x||0.退出]")
match m:
case '1':
# 下载小说
# t = input("请输入下载类型[1.txt|2.json]")
bid = novel_spider.search_novel(input("请输入要搜索的小说名:"))
novel_spider.get_novel_info(bid)
novel_spider.down_text()
case '2':
pass
case '0':
break
# 主程序入口
if __name__ == '__main__':
'''
打开方式nogui(0)|gui(1)
'''
# i = int(input("请输入打开方式[nogui(0)|gui(1)]"))
i = 0
run(i)