""" 文档内容获取服务(轻量版) 只做一件事:从 URL/GitHub 获取文档内容,转换成 Markdown reference 文件。 不涉及复杂的爬取、代码分析、脚本执行等功能。 适用于: - 用户输入文档 URL,自动获取内容作为 references - 简单的文档解析和清理 - 与现有 LLM 生成流程配合 """ import asyncio import re from typing import Optional, List, Dict, Any, TYPE_CHECKING from pathlib import Path import logging try: import httpx from bs4 import BeautifulSoup HTTPX_AVAILABLE = True except ImportError: HTTPX_AVAILABLE = False # TYPE_CHECKING is always False at runtime, so BeautifulSoup won't be imported if TYPE_CHECKING: from bs4 import BeautifulSoup from app.utils.logger import get_logger logger = get_logger(__name__) class DocumentationFetcher: """ 轻量级文档获取器 功能: 1. 从 URL 获取网页内容 2. 提取主要内容(去除导航、广告等) 3. 转换为 Markdown 格式 4. 清理和格式化 不做: - 复杂的爬虫(不递归抓取) - 代码分析 - 脚本执行 """ def __init__(self, timeout: int = 30): """ 初始化获取器 Args: timeout: 请求超时时间(秒) """ self.timeout = timeout if not HTTPX_AVAILABLE: logger.warning("httpx 或 beautifulsoup4 未安装,部分功能不可用") logger.warning("请安装: pip install httpx beautifulsoup4") async def fetch_from_url( self, url: str, selector: Optional[str] = None ) -> Dict[str, Any]: """ 从 URL 获取文档内容 Args: url: 文档 URL selector: CSS 选择器(可选,用于定位主内容) Returns: 包含获取结果的字典 """ if not HTTPX_AVAILABLE: return { "success": False, "error": "httpx 未安装,请运行: pip install httpx" } logger.info(f"获取文档: {url}") try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(url, follow_redirects=True) response.raise_for_status() # 解析 HTML soup = BeautifulSoup(response.text, "html.parser") # 提取内容 title = self._extract_title(soup) content = self._extract_main_content(soup, selector) # 转换为 Markdown markdown = self._html_to_markdown(title, content, url) # 清理 markdown = self._clean_markdown(markdown) return { "success": True, "url": url, "title": title, "content": markdown, "word_count": len(markdown.split()), "char_count": len(markdown) } except httpx.TimeoutException: return { "success": False, "error": f"请求超时: {url}" } except httpx.HTTPStatusError as e: return { "success": False, "error": f"HTTP 错误: {e.response.status_code}" } except Exception as e: logger.error(f"获取文档失败: {str(e)}") return { "success": False, "error": str(e) } async def fetch_from_github( self, repo_url: str, docs_path: str = "README.md" ) -> Dict[str, Any]: """ 从 GitHub 获取文档 Args: repo_url: 仓库 URL (如 https://github.com/owner/repo) docs_path: 文档路径 (如 README.md, docs/README.md) Returns: 包含获取结果的字典 """ if not HTTPX_AVAILABLE: return { "success": False, "error": "httpx 未安装" } logger.info(f"获取 GitHub 文档: {repo_url}/{docs_path}") try: # 解析 repo 信息 match = re.match(r'https://github\.com/([^/]+)/([^/]+)', repo_url) if not match: return { "success": False, "error": "无效的 GitHub URL" } owner, repo = match.groups() # 使用 raw.githubusercontent.com 获取文件 raw_url = f"https://raw.githubusercontent.com/{owner}/{repo}/main/{docs_path}" async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get(raw_url, follow_redirects=True) # 如果 main 分支不存在,尝试 master if response.status_code == 404: raw_url = raw_url.replace("/main/", "/master/") response = await client.get(raw_url, follow_redirects=True) response.raise_for_status() content = response.text return { "success": True, "repo_url": repo_url, "docs_path": docs_path, "content": content, "word_count": len(content.split()), "char_count": len(content) } except Exception as e: logger.error(f"获取 GitHub 文档失败: {str(e)}") return { "success": False, "error": str(e) } def _extract_title(self, soup: Any) -> str: """提取页面标题""" # 尝试从 h1 获取 h1 = soup.find("h1") if h1: return h1.get_text().strip() # 尝试从 title 标签获取 title_tag = soup.find("title") if title_tag: return title_tag.get_text().strip() return "Untitled" def _extract_main_content( self, soup: Any, selector: Optional[str] ) -> Any: """提取主要内容""" # 如果指定了选择器,使用它 if selector: main = soup.select_one(selector) if main: return main # 尝试常见的内容区域选择器 content_selectors = [ "article", "main", '[role="main"]', ".content", "#content", ".documentation", ".docs-content", "main .content" ] for sel in content_selectors: main = soup.select_one(sel) if main: return main # 如果都找不到,返回 body body = soup.find("body") return body if body else soup def _html_to_markdown( self, title: str, content: Any, url: str ) -> str: """将 HTML 转换为 Markdown""" lines = [f"# {title}\n"] lines.append(f"Source: {url}\n") # 提取标题 for i, heading in enumerate(content.find_all(["h1", "h2", "h3", "h4", "h5", "h6"])): if heading.name == "h1": continue # 跳过第一个 h1(已经用作标题) level = int(heading.name[1]) text = heading.get_text().strip() lines.append(f"\n{'#' * level} {text}\n") # 提取段落 paragraphs = [] for p in content.find_all("p"): text = p.get_text().strip() if len(text) > 20: # 只保留有意义的段落 paragraphs.append(text) if paragraphs: lines.append("\n## Content\n") lines.extend(paragraphs) # 提取代码块 code_blocks = [] for pre in content.find_all("pre"): code = pre.get_text() if len(code) > 10: code_blocks.append(f"```\n{code}\n```") if code_blocks: lines.append("\n## Code Examples\n") lines.extend(code_blocks[:5]) # 最多 5 个代码块 # 提取列表 for ul in content.find_all(["ul", "ol"]): items = [f"- {li.get_text().strip()}" for li in ul.find_all("li")] if items: lines.append("\n") lines.extend(items) return "\n".join(lines) def _clean_markdown(self, markdown: str) -> str: """清理 Markdown 内容""" # 移除过多的空行 markdown = re.sub(r'\n{3,}', '\n\n', markdown) # 移除导航类文本 noise_patterns = [ r'Table of Contents.*?(?=\n##)', r'Navigation.*?(?=\n##)', r'Menu.*?(?=\n##)', r'Skip to content', r'© \d{4}.*', ] for pattern in noise_patterns: markdown = re.sub(pattern, '', markdown, flags=re.IGNORECASE | re.DOTALL) return markdown.strip() async def fetch_multiple_urls( self, urls: List[str], selector: Optional[str] = None ) -> List[Dict[str, Any]]: """ 批量获取多个 URL 的内容 Args: urls: URL 列表 selector: CSS 选择器 Returns: 获取结果列表 """ tasks = [self.fetch_from_url(url, selector) for url in urls] results = await asyncio.gather(*tasks, return_exceptions=True) # 处理异常 processed_results = [] for i, result in enumerate(results): if isinstance(result, Exception): processed_results.append({ "success": False, "url": urls[i], "error": str(result) }) else: processed_results.append(result) return processed_results # 全局单例 _doc_fetcher: Optional[DocumentationFetcher] = None def get_documentation_fetcher() -> DocumentationFetcher: """获取文档获取器单例""" global _doc_fetcher if _doc_fetcher is None: _doc_fetcher = DocumentationFetcher() return _doc_fetcher