356 lines
10 KiB
Python
356 lines
10 KiB
Python
"""
|
||
文档内容获取服务(轻量版)
|
||
|
||
只做一件事:从 URL/GitHub 获取文档内容,转换成 Markdown reference 文件。
|
||
不涉及复杂的爬取、代码分析、脚本执行等功能。
|
||
|
||
适用于:
|
||
- 用户输入文档 URL,自动获取内容作为 references
|
||
- 简单的文档解析和清理
|
||
- 与现有 LLM 生成流程配合
|
||
"""
|
||
|
||
import asyncio
|
||
import re
|
||
from typing import Optional, List, Dict, Any, TYPE_CHECKING
|
||
from pathlib import Path
|
||
import logging
|
||
|
||
try:
|
||
import httpx
|
||
from bs4 import BeautifulSoup
|
||
HTTPX_AVAILABLE = True
|
||
except ImportError:
|
||
HTTPX_AVAILABLE = False
|
||
# TYPE_CHECKING is always False at runtime, so BeautifulSoup won't be imported
|
||
if TYPE_CHECKING:
|
||
from bs4 import BeautifulSoup
|
||
|
||
from app.utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class DocumentationFetcher:
|
||
"""
|
||
轻量级文档获取器
|
||
|
||
功能:
|
||
1. 从 URL 获取网页内容
|
||
2. 提取主要内容(去除导航、广告等)
|
||
3. 转换为 Markdown 格式
|
||
4. 清理和格式化
|
||
|
||
不做:
|
||
- 复杂的爬虫(不递归抓取)
|
||
- 代码分析
|
||
- 脚本执行
|
||
"""
|
||
|
||
def __init__(self, timeout: int = 30):
|
||
"""
|
||
初始化获取器
|
||
|
||
Args:
|
||
timeout: 请求超时时间(秒)
|
||
"""
|
||
self.timeout = timeout
|
||
|
||
if not HTTPX_AVAILABLE:
|
||
logger.warning("httpx 或 beautifulsoup4 未安装,部分功能不可用")
|
||
logger.warning("请安装: pip install httpx beautifulsoup4")
|
||
|
||
async def fetch_from_url(
|
||
self,
|
||
url: str,
|
||
selector: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
从 URL 获取文档内容
|
||
|
||
Args:
|
||
url: 文档 URL
|
||
selector: CSS 选择器(可选,用于定位主内容)
|
||
|
||
Returns:
|
||
包含获取结果的字典
|
||
"""
|
||
if not HTTPX_AVAILABLE:
|
||
return {
|
||
"success": False,
|
||
"error": "httpx 未安装,请运行: pip install httpx"
|
||
}
|
||
|
||
logger.info(f"获取文档: {url}")
|
||
|
||
try:
|
||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||
response = await client.get(url, follow_redirects=True)
|
||
response.raise_for_status()
|
||
|
||
# 解析 HTML
|
||
soup = BeautifulSoup(response.text, "html.parser")
|
||
|
||
# 提取内容
|
||
title = self._extract_title(soup)
|
||
content = self._extract_main_content(soup, selector)
|
||
|
||
# 转换为 Markdown
|
||
markdown = self._html_to_markdown(title, content, url)
|
||
|
||
# 清理
|
||
markdown = self._clean_markdown(markdown)
|
||
|
||
return {
|
||
"success": True,
|
||
"url": url,
|
||
"title": title,
|
||
"content": markdown,
|
||
"word_count": len(markdown.split()),
|
||
"char_count": len(markdown)
|
||
}
|
||
|
||
except httpx.TimeoutException:
|
||
return {
|
||
"success": False,
|
||
"error": f"请求超时: {url}"
|
||
}
|
||
except httpx.HTTPStatusError as e:
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP 错误: {e.response.status_code}"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"获取文档失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e)
|
||
}
|
||
|
||
async def fetch_from_github(
|
||
self,
|
||
repo_url: str,
|
||
docs_path: str = "README.md"
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
从 GitHub 获取文档
|
||
|
||
Args:
|
||
repo_url: 仓库 URL (如 https://github.com/owner/repo)
|
||
docs_path: 文档路径 (如 README.md, docs/README.md)
|
||
|
||
Returns:
|
||
包含获取结果的字典
|
||
"""
|
||
if not HTTPX_AVAILABLE:
|
||
return {
|
||
"success": False,
|
||
"error": "httpx 未安装"
|
||
}
|
||
|
||
logger.info(f"获取 GitHub 文档: {repo_url}/{docs_path}")
|
||
|
||
try:
|
||
# 解析 repo 信息
|
||
match = re.match(r'https://github\.com/([^/]+)/([^/]+)', repo_url)
|
||
if not match:
|
||
return {
|
||
"success": False,
|
||
"error": "无效的 GitHub URL"
|
||
}
|
||
|
||
owner, repo = match.groups()
|
||
|
||
# 使用 raw.githubusercontent.com 获取文件
|
||
raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/main/{docs_path}"
|
||
|
||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||
response = await client.get(raw_url, follow_redirects=True)
|
||
|
||
# 如果 main 分支不存在,尝试 master
|
||
if response.status_code == 404:
|
||
raw_url = raw_url.replace("/main/", "/master/")
|
||
response = await client.get(raw_url, follow_redirects=True)
|
||
|
||
response.raise_for_status()
|
||
|
||
content = response.text
|
||
|
||
return {
|
||
"success": True,
|
||
"repo_url": repo_url,
|
||
"docs_path": docs_path,
|
||
"content": content,
|
||
"word_count": len(content.split()),
|
||
"char_count": len(content)
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取 GitHub 文档失败: {str(e)}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e)
|
||
}
|
||
|
||
def _extract_title(self, soup: Any) -> str:
|
||
"""提取页面标题"""
|
||
# 尝试从 h1 获取
|
||
h1 = soup.find("h1")
|
||
if h1:
|
||
return h1.get_text().strip()
|
||
|
||
# 尝试从 title 标签获取
|
||
title_tag = soup.find("title")
|
||
if title_tag:
|
||
return title_tag.get_text().strip()
|
||
|
||
return "Untitled"
|
||
|
||
def _extract_main_content(
|
||
self,
|
||
soup: Any,
|
||
selector: Optional[str]
|
||
) -> Any:
|
||
"""提取主要内容"""
|
||
|
||
# 如果指定了选择器,使用它
|
||
if selector:
|
||
main = soup.select_one(selector)
|
||
if main:
|
||
return main
|
||
|
||
# 尝试常见的内容区域选择器
|
||
content_selectors = [
|
||
"article",
|
||
"main",
|
||
'[role="main"]',
|
||
".content",
|
||
"#content",
|
||
".documentation",
|
||
".docs-content",
|
||
"main .content"
|
||
]
|
||
|
||
for sel in content_selectors:
|
||
main = soup.select_one(sel)
|
||
if main:
|
||
return main
|
||
|
||
# 如果都找不到,返回 body
|
||
body = soup.find("body")
|
||
return body if body else soup
|
||
|
||
def _html_to_markdown(
|
||
self,
|
||
title: str,
|
||
content: Any,
|
||
url: str
|
||
) -> str:
|
||
"""将 HTML 转换为 Markdown"""
|
||
|
||
lines = [f"# {title}\n"]
|
||
lines.append(f"Source: {url}\n")
|
||
|
||
# 提取标题
|
||
for i, heading in enumerate(content.find_all(["h1", "h2", "h3", "h4", "h5", "h6"])):
|
||
if heading.name == "h1":
|
||
continue # 跳过第一个 h1(已经用作标题)
|
||
level = int(heading.name[1])
|
||
text = heading.get_text().strip()
|
||
lines.append(f"\n{'#' * level} {text}\n")
|
||
|
||
# 提取段落
|
||
paragraphs = []
|
||
for p in content.find_all("p"):
|
||
text = p.get_text().strip()
|
||
if len(text) > 20: # 只保留有意义的段落
|
||
paragraphs.append(text)
|
||
|
||
if paragraphs:
|
||
lines.append("\n## Content\n")
|
||
lines.extend(paragraphs)
|
||
|
||
# 提取代码块
|
||
code_blocks = []
|
||
for pre in content.find_all("pre"):
|
||
code = pre.get_text()
|
||
if len(code) > 10:
|
||
code_blocks.append(f"```\n{code}\n```")
|
||
|
||
if code_blocks:
|
||
lines.append("\n## Code Examples\n")
|
||
lines.extend(code_blocks[:5]) # 最多 5 个代码块
|
||
|
||
# 提取列表
|
||
for ul in content.find_all(["ul", "ol"]):
|
||
items = [f"- {li.get_text().strip()}" for li in ul.find_all("li")]
|
||
if items:
|
||
lines.append("\n")
|
||
lines.extend(items)
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _clean_markdown(self, markdown: str) -> str:
|
||
"""清理 Markdown 内容"""
|
||
|
||
# 移除过多的空行
|
||
markdown = re.sub(r'\n{3,}', '\n\n', markdown)
|
||
|
||
# 移除导航类文本
|
||
noise_patterns = [
|
||
r'Table of Contents.*?(?=\n##)',
|
||
r'Navigation.*?(?=\n##)',
|
||
r'Menu.*?(?=\n##)',
|
||
r'Skip to content',
|
||
r'© \d{4}.*',
|
||
]
|
||
|
||
for pattern in noise_patterns:
|
||
markdown = re.sub(pattern, '', markdown, flags=re.IGNORECASE | re.DOTALL)
|
||
|
||
return markdown.strip()
|
||
|
||
async def fetch_multiple_urls(
|
||
self,
|
||
urls: List[str],
|
||
selector: Optional[str] = None
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
批量获取多个 URL 的内容
|
||
|
||
Args:
|
||
urls: URL 列表
|
||
selector: CSS 选择器
|
||
|
||
Returns:
|
||
获取结果列表
|
||
"""
|
||
tasks = [self.fetch_from_url(url, selector) for url in urls]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
|
||
# 处理异常
|
||
processed_results = []
|
||
for i, result in enumerate(results):
|
||
if isinstance(result, Exception):
|
||
processed_results.append({
|
||
"success": False,
|
||
"url": urls[i],
|
||
"error": str(result)
|
||
})
|
||
else:
|
||
processed_results.append(result)
|
||
|
||
return processed_results
|
||
|
||
|
||
# 全局单例
|
||
_doc_fetcher: Optional[DocumentationFetcher] = None
|
||
|
||
|
||
def get_documentation_fetcher() -> DocumentationFetcher:
|
||
"""获取文档获取器单例"""
|
||
global _doc_fetcher
|
||
if _doc_fetcher is None:
|
||
_doc_fetcher = DocumentationFetcher()
|
||
return _doc_fetcher
|