creative_studio/backend/app/core/documentation_fetcher.py

356 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
文档内容获取服务(轻量版)
只做一件事:从 URL/GitHub 获取文档内容,转换成 Markdown reference 文件。
不涉及复杂的爬取、代码分析、脚本执行等功能。
适用于:
- 用户输入文档 URL自动获取内容作为 references
- 简单的文档解析和清理
- 与现有 LLM 生成流程配合
"""
import asyncio
import re
from typing import Optional, List, Dict, Any, TYPE_CHECKING
from pathlib import Path
import logging
try:
import httpx
from bs4 import BeautifulSoup
HTTPX_AVAILABLE = True
except ImportError:
HTTPX_AVAILABLE = False
# TYPE_CHECKING is always False at runtime, so BeautifulSoup won't be imported
if TYPE_CHECKING:
from bs4 import BeautifulSoup
from app.utils.logger import get_logger
logger = get_logger(__name__)
class DocumentationFetcher:
"""
轻量级文档获取器
功能:
1. 从 URL 获取网页内容
2. 提取主要内容(去除导航、广告等)
3. 转换为 Markdown 格式
4. 清理和格式化
不做:
- 复杂的爬虫(不递归抓取)
- 代码分析
- 脚本执行
"""
def __init__(self, timeout: int = 30):
"""
初始化获取器
Args:
timeout: 请求超时时间(秒)
"""
self.timeout = timeout
if not HTTPX_AVAILABLE:
logger.warning("httpx 或 beautifulsoup4 未安装,部分功能不可用")
logger.warning("请安装: pip install httpx beautifulsoup4")
async def fetch_from_url(
self,
url: str,
selector: Optional[str] = None
) -> Dict[str, Any]:
"""
从 URL 获取文档内容
Args:
url: 文档 URL
selector: CSS 选择器(可选,用于定位主内容)
Returns:
包含获取结果的字典
"""
if not HTTPX_AVAILABLE:
return {
"success": False,
"error": "httpx 未安装,请运行: pip install httpx"
}
logger.info(f"获取文档: {url}")
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
# 解析 HTML
soup = BeautifulSoup(response.text, "html.parser")
# 提取内容
title = self._extract_title(soup)
content = self._extract_main_content(soup, selector)
# 转换为 Markdown
markdown = self._html_to_markdown(title, content, url)
# 清理
markdown = self._clean_markdown(markdown)
return {
"success": True,
"url": url,
"title": title,
"content": markdown,
"word_count": len(markdown.split()),
"char_count": len(markdown)
}
except httpx.TimeoutException:
return {
"success": False,
"error": f"请求超时: {url}"
}
except httpx.HTTPStatusError as e:
return {
"success": False,
"error": f"HTTP 错误: {e.response.status_code}"
}
except Exception as e:
logger.error(f"获取文档失败: {str(e)}")
return {
"success": False,
"error": str(e)
}
async def fetch_from_github(
self,
repo_url: str,
docs_path: str = "README.md"
) -> Dict[str, Any]:
"""
从 GitHub 获取文档
Args:
repo_url: 仓库 URL (如 https://github.com/owner/repo)
docs_path: 文档路径 (如 README.md, docs/README.md)
Returns:
包含获取结果的字典
"""
if not HTTPX_AVAILABLE:
return {
"success": False,
"error": "httpx 未安装"
}
logger.info(f"获取 GitHub 文档: {repo_url}/{docs_path}")
try:
# 解析 repo 信息
match = re.match(r'https://github\.com/([^/]+)/([^/]+)', repo_url)
if not match:
return {
"success": False,
"error": "无效的 GitHub URL"
}
owner, repo = match.groups()
# 使用 raw.githubusercontent.com 获取文件
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/main/{docs_path}"
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(raw_url, follow_redirects=True)
# 如果 main 分支不存在,尝试 master
if response.status_code == 404:
raw_url = raw_url.replace("/main/", "/master/")
response = await client.get(raw_url, follow_redirects=True)
response.raise_for_status()
content = response.text
return {
"success": True,
"repo_url": repo_url,
"docs_path": docs_path,
"content": content,
"word_count": len(content.split()),
"char_count": len(content)
}
except Exception as e:
logger.error(f"获取 GitHub 文档失败: {str(e)}")
return {
"success": False,
"error": str(e)
}
def _extract_title(self, soup: Any) -> str:
"""提取页面标题"""
# 尝试从 h1 获取
h1 = soup.find("h1")
if h1:
return h1.get_text().strip()
# 尝试从 title 标签获取
title_tag = soup.find("title")
if title_tag:
return title_tag.get_text().strip()
return "Untitled"
def _extract_main_content(
self,
soup: Any,
selector: Optional[str]
) -> Any:
"""提取主要内容"""
# 如果指定了选择器,使用它
if selector:
main = soup.select_one(selector)
if main:
return main
# 尝试常见的内容区域选择器
content_selectors = [
"article",
"main",
'[role="main"]',
".content",
"#content",
".documentation",
".docs-content",
"main .content"
]
for sel in content_selectors:
main = soup.select_one(sel)
if main:
return main
# 如果都找不到,返回 body
body = soup.find("body")
return body if body else soup
def _html_to_markdown(
self,
title: str,
content: Any,
url: str
) -> str:
"""将 HTML 转换为 Markdown"""
lines = [f"# {title}\n"]
lines.append(f"Source: {url}\n")
# 提取标题
for i, heading in enumerate(content.find_all(["h1", "h2", "h3", "h4", "h5", "h6"])):
if heading.name == "h1":
continue # 跳过第一个 h1已经用作标题
level = int(heading.name[1])
text = heading.get_text().strip()
lines.append(f"\n{'#' * level} {text}\n")
# 提取段落
paragraphs = []
for p in content.find_all("p"):
text = p.get_text().strip()
if len(text) > 20: # 只保留有意义的段落
paragraphs.append(text)
if paragraphs:
lines.append("\n## Content\n")
lines.extend(paragraphs)
# 提取代码块
code_blocks = []
for pre in content.find_all("pre"):
code = pre.get_text()
if len(code) > 10:
code_blocks.append(f"```\n{code}\n```")
if code_blocks:
lines.append("\n## Code Examples\n")
lines.extend(code_blocks[:5]) # 最多 5 个代码块
# 提取列表
for ul in content.find_all(["ul", "ol"]):
items = [f"- {li.get_text().strip()}" for li in ul.find_all("li")]
if items:
lines.append("\n")
lines.extend(items)
return "\n".join(lines)
def _clean_markdown(self, markdown: str) -> str:
"""清理 Markdown 内容"""
# 移除过多的空行
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
# 移除导航类文本
noise_patterns = [
r'Table of Contents.*?(?=\n##)',
r'Navigation.*?(?=\n##)',
r'Menu.*?(?=\n##)',
r'Skip to content',
r'© \d{4}.*',
]
for pattern in noise_patterns:
markdown = re.sub(pattern, '', markdown, flags=re.IGNORECASE | re.DOTALL)
return markdown.strip()
async def fetch_multiple_urls(
self,
urls: List[str],
selector: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
批量获取多个 URL 的内容
Args:
urls: URL 列表
selector: CSS 选择器
Returns:
获取结果列表
"""
tasks = [self.fetch_from_url(url, selector) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常
processed_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
processed_results.append({
"success": False,
"url": urls[i],
"error": str(result)
})
else:
processed_results.append(result)
return processed_results
# 全局单例
_doc_fetcher: Optional[DocumentationFetcher] = None
def get_documentation_fetcher() -> DocumentationFetcher:
"""获取文档获取器单例"""
global _doc_fetcher
if _doc_fetcher is None:
_doc_fetcher = DocumentationFetcher()
return _doc_fetcher