""" 剧本切分器 支持混合切分策略: 1. 优先按场景标记切分(第X场、Scene X、【场景X】等) 2. 单场超过阈值时按固定长度切分(5000字符,保留500字符重叠) """ import re from typing import List, Dict, Any from dataclasses import dataclass from app.utils.logger import get_logger logger = get_logger(__name__) @dataclass class ScriptSegment: """剧本片段""" index: int # 片段索引(从0开始) content: str # 片段内容 start_pos: int # 在原文中的起始位置 end_pos: int # 在原文中的结束位置 scene_marker: str = "" # 场景标记(如果有) metadata: Dict[str, Any] = None # 额外元数据 def __post_init__(self): if self.metadata is None: self.metadata = {} class ScriptSplitter: """剧本切分器 - 混合切分策略""" # 场景标记的正则表达式模式(支持多种格式) SCENE_PATTERNS = [ r'第[一二三四五六七八九十百千0-9]+[集场幕]', # 第X场/集/幕 r'[第第]?[0-9]+[集场幕]', # 第1场/集/幕 或 1场/集/幕 r'Scene\s*\d+', # Scene 1, Scene 2, ... r'【场景[^\n]*】', # 【场景xxx】 r'【[场景场][^\n]*】', # 【场xxx】 r'\[场景[^\n]*\]', # [场景xxx] r'场景[::][^\n]*', # 场景:xxx r'[0-9]+\.[0-9]+\s*[场景场外]', # 1.1 场景/场/外 ] # 切分阈值 MAX_SEGMENT_LENGTH = 5000 # 单片段最大长度 OVERLAP_LENGTH = 500 # 切分时的重叠长度 CHUNK_THRESHOLD = 8000 # 超过此长度开始切分 def __init__( self, max_segment_length: int = MAX_SEGMENT_LENGTH, overlap_length: int = OVERLAP_LENGTH, chunk_threshold: int = CHUNK_THRESHOLD ): """ 初始化切分器 Args: max_segment_length: 单片段最大长度 overlap_length: 切分时的重叠长度(保持上下文) chunk_threshold: 超过此长度开始切分 """ self.max_segment_length = max_segment_length self.overlap_length = overlap_length self.chunk_threshold = chunk_threshold def split(self, content: str) -> List[ScriptSegment]: """ 切分剧本内容 Args: content: 剧本内容 Returns: 切分后的片段列表 """ if not content or len(content) <= self.chunk_threshold: # 内容较短,不需要切分 return [ScriptSegment( index=0, content=content, start_pos=0, end_pos=len(content), metadata={"split_method": "none", "reason": "content_too_short"} )] # 尝试按场景切分 segments = self._split_by_scenes(content) # 检查是否有片段过长 oversized_segments = [ s for s in segments if len(s.content) > self.max_segment_length ] if oversized_segments: # 对过长片段进行二次切分 segments = self._split_oversized_segments(segments) logger.info(f"剧本切分完成:共 {len(segments)} 个片段") return segments def _split_by_scenes(self, content: str) -> List[ScriptSegment]: """ 按场景标记切分 Args: content: 剧本内容 Returns: 切分后的片段列表 """ # 编译所有场景模式 patterns = [re.compile(pattern, re.MULTILINE) for pattern in self.SCENE_PATTERNS] # 查找所有场景标记位置 scene_positions = [] for pattern in patterns: for match in pattern.finditer(content): scene_positions.append({ 'pos': match.start(), 'marker': match.group(0), 'pattern': pattern.pattern }) # 按位置排序并去重 scene_positions = sorted( list({(p['pos'], p['marker']): p for p in scene_positions}.values()), key=lambda x: x['pos'] ) if not scene_positions: # 没有找到场景标记,使用固定长度切分 return self._split_by_length(content) # 按场景位置切分 segments = [] for i, scene_info in enumerate(scene_positions): start_pos = scene_info['pos'] end_pos = scene_positions[i + 1]['pos'] if i + 1 < len(scene_positions) else len(content) segment_content = content[start_pos:end_pos].strip() if segment_content: segments.append(ScriptSegment( index=i, content=segment_content, start_pos=start_pos, end_pos=end_pos, scene_marker=scene_info['marker'], metadata={ "split_method": "scene", "scene_marker": scene_info['marker'] } )) return segments def _split_by_length(self, content: str) -> List[ScriptSegment]: """ 按固定长度切分(带重叠) Args: content: 剧本内容 Returns: 切分后的片段列表 """ segments = [] content_length = len(content) pos = 0 index = 0 while pos < content_length: end_pos = min(pos + self.max_segment_length, content_length) segment_content = content[pos:end_pos] # 如果不是最后一段,添加重叠 if end_pos < content_length: overlap_content = content[end_pos:end_pos + self.overlap_length] segment_content += overlap_content segments.append(ScriptSegment( index=index, content=segment_content, start_pos=pos, end_pos=end_pos, metadata={ "split_method": "length", "has_overlap": end_pos < content_length } )) # 移动到下一个片段 pos += self.max_segment_length index += 1 return segments def _split_oversized_segments( self, segments: List[ScriptSegment] ) -> List[ScriptSegment]: """ 对过长的片段进行二次切分 Args: segments: 原始片段列表 Returns: 处理后的片段列表 """ result = [] for segment in segments: if len(segment.content) <= self.max_segment_length: # 片段长度合适,直接添加 result.append(segment) else: # 片段过长,进行二次切分 logger.info(f"片段 {segment.index} 过长 ({len(segment.content)} 字符),进行二次切分") sub_segments = self._split_by_length(segment.content) # 重新编号并保留原始元数据 for i, sub_seg in enumerate(sub_segments): result.append(ScriptSegment( index=len(result), content=sub_seg.content, start_pos=segment.start_pos + sub_seg.start_pos, end_pos=segment.start_pos + sub_seg.end_pos, scene_marker=segment.scene_marker if i == 0 else f"{segment.scene_marker}(续)", metadata={ **segment.metadata, "split_method": f"{segment.metadata.get('split_method', '')}_then_length", "parent_scene": segment.scene_marker, "sub_index": i } )) return result def get_split_summary(self, segments: List[ScriptSegment]) -> Dict[str, Any]: """ 获取切分摘要信息 Args: segments: 片段列表 Returns: 摘要信息字典 """ if not segments: return { "total_segments": 0, "total_length": 0, "split_methods": {} } total_length = sum(len(s.content) for s in segments) split_methods = {} for segment in segments: method = segment.metadata.get("split_method", "unknown") split_methods[method] = split_methods.get(method, 0) + 1 return { "total_segments": len(segments), "total_length": total_length, "average_length": total_length // len(segments), "min_length": min(len(s.content) for s in segments), "max_length": max(len(s.content) for s in segments), "split_methods": split_methods, "has_scenes": any(s.scene_marker for s in segments) } def split_script( content: str, max_segment_length: int = ScriptSplitter.MAX_SEGMENT_LENGTH, overlap_length: int = ScriptSplitter.OVERLAP_LENGTH, chunk_threshold: int = ScriptSplitter.CHUNK_THRESHOLD ) -> Dict[str, Any]: """ 便捷函数:切分剧本内容 Args: content: 剧本内容 max_segment_length: 单片段最大长度 overlap_length: 切分时的重叠长度 chunk_threshold: 超过此长度开始切分 Returns: 包含片段列表和摘要的字典 """ splitter = ScriptSplitter( max_segment_length=max_segment_length, overlap_length=overlap_length, chunk_threshold=chunk_threshold ) segments = splitter.split(content) summary = splitter.get_split_summary(segments) # 转换为可序列化的格式 segments_data = [ { "index": s.index, "content": s.content, "start_pos": s.start_pos, "end_pos": s.end_pos, "scene_marker": s.scene_marker, "length": len(s.content), "metadata": s.metadata } for s in segments ] return { "segments": segments_data, "summary": summary }