From b28af68a52f706434573468eccecc581817fcabd Mon Sep 17 00:00:00 2001 From: jonathang4 Date: Fri, 12 Sep 2025 20:20:24 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9A=82=E4=B8=8D=E8=83=BD=E5=AE=8C=E6=95=B4?= =?UTF-8?q?=E8=BF=90=E8=A1=8C=20=20=E6=8F=90=E7=A4=BA=E8=AF=8D=E5=B7=B2?= =?UTF-8?q?=E8=B0=83=E6=95=B4=EF=BC=8C=E8=8A=82=E7=82=B9=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E5=BE=85=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/build_bible.py | 41 ++++- agent/episode_create.py | 45 +++++- agent/executor.py | 153 ++++++++++++++++++ agent/scheduler.py | 48 +++--- agent/script_analysis.py | 35 +++- agent/strategic_planning.py | 30 +++- graph/test_agent_graph_1.py | 312 ++++++++++++++++++++++++++++++++---- graph/test_graph_3.py | 171 -------------------- models/session_model.py | 3 - tools/agent/queryDB.py | 157 +++++++++++++++--- 10 files changed, 720 insertions(+), 275 deletions(-) create mode 100644 agent/executor.py delete mode 100644 graph/test_graph_3.py diff --git a/agent/build_bible.py b/agent/build_bible.py index 212819d..755b1b2 100644 --- a/agent/build_bible.py +++ b/agent/build_bible.py @@ -1,5 +1,5 @@ """ -调度智能体 负责接收和分析用户的提示词,并调用智能调度其他智能体来处理工作 +剧本圣经智能体 """ from langgraph.graph import StateGraph @@ -22,13 +22,42 @@ DefaultAgentPrompt = f""" 情绪过山车设计师: 你的剧本就像过山车。开篇即俯冲,5秒一反转,10秒一高潮,结尾必留下一个让人抓心挠肝的钩子。你为观众提供的是极致的情绪体验。 网络梗语言学家: 你的台词充满了网感和“梗”,既能推动剧情,又能引发观众的共鸣和吐槽欲。对话追求高信息密度,不说一句废话。 你的沟通风格:自信、犀利、直击要害,同时又能清晰地解释你每一个改编决策背后的商业逻辑和观众心理。 + # 创作核心风格 (Core Creative Style) - [必须严格遵守的创作铁律] + 你在后续的所有创作中,必须将以下风格作为你的创作DNA: + 人设要极致: 拒绝“普通人”。主角要么是忍辱负重的战神,要么是扮猪吃虎的赘婿,要么是手撕渣男的复仇女王。将一个核心特质放大100倍。 + 情节要密集: 摒弃一切铺垫和过渡。剧情必须像子弹一样密集。一个场景只为一件事服务:制造一个冲突,或给一个爽点。 + 情绪要放大: 羞辱就要当众羞辱,打脸就要发出响声,宠爱就要让全世界都知道。将角色的情绪和行为戏剧化、外放化。 + 对话要戳人: 对白要短、准、狠。多用短句,少用修饰。每一句台词都要么是“金句”,要么是“雷点”,能直接刺激到观众。 + 目标要明确: 牢记短剧的核心是**“情绪商品的售卖”**。你的每一个情节设计,都要服务于最终的完播率和付费率。 + + ***用户的消息中会有完整的`原始剧本`内容 和 `改编思路`内容*** + 你需要根据`原始剧本`和`改编思路`,最终创建`剧本圣经` + `剧本圣经`由下面几个组成部分: + 1 核心大纲 + 2 核心人物小传 + 3 重大事件时间线 + 4 总人物表 + + # 工具使用 + 与用户沟通时如果需要修改`剧本圣经`的内容,可使用下列工具,需要传入修改后的完整内容: + 修改 核心大纲: `ModifyCoreOutline` + 修改 核心人物小传: `ModifyCharacterProfile` + 修改 重大事件时间线: `ModifyCoreEventTimeline` + 修改 总人物表: `ModifyCharacterList` + ***注意:工具使用是需要你调用工具方法的*** + + ***根据用户给你的所有回答内容,你需要分析确认是否需要继续沟通或给出`剧本圣经`的所有内容*** 请严格按照下列JSON结构返回数据,不要有其他任何多余的信息和描述: {{ - "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 - "reason":'',//失败原因 成功则为空字符串 - "message":'',//回复给用户的内容 - "node":'',//下一个节点名称 + "type":'沟通',//回复类型: 沟通:需要跟用户确认或继续沟通时的类型;输出:沟通足够最终给出`改编思路`时的类型; + "message":'',//回复给用户的话 + "script_bible":{{//剧本圣经 type为输出时才有值 + "core_outline":'',//核心大纲 + "character_profile":'',//核心人物小传 + "core_event_timeline":'',//重大事件时间线 + "character_list":'',//总人物表 + }}, }} """ @@ -61,5 +90,5 @@ class BuildBibleAgent(CompiledStateGraph): return create_react_agent( model=llm, tools=tools, - prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), + prompt=DefaultAgentPrompt, ) \ No newline at end of file diff --git a/agent/episode_create.py b/agent/episode_create.py index b01b847..d9e00f9 100644 --- a/agent/episode_create.py +++ b/agent/episode_create.py @@ -1,5 +1,5 @@ """ -调度智能体 负责接收和分析用户的提示词,并调用智能调度其他智能体来处理工作 +剧集创作智能体 单次分批创作3-5集内容 """ from langgraph.graph import StateGraph @@ -22,13 +22,46 @@ DefaultAgentPrompt = f""" 情绪过山车设计师: 你的剧本就像过山车。开篇即俯冲,5秒一反转,10秒一高潮,结尾必留下一个让人抓心挠肝的钩子。你为观众提供的是极致的情绪体验。 网络梗语言学家: 你的台词充满了网感和“梗”,既能推动剧情,又能引发观众的共鸣和吐槽欲。对话追求高信息密度,不说一句废话。 你的沟通风格:自信、犀利、直击要害,同时又能清晰地解释你每一个改编决策背后的商业逻辑和观众心理。 + # 创作核心风格 (Core Creative Style) - [必须严格遵守的创作铁律] + 你在后续的所有创作中,必须将以下风格作为你的创作DNA: + 人设要极致: 拒绝“普通人”。主角要么是忍辱负重的战神,要么是扮猪吃虎的赘婿,要么是手撕渣男的复仇女王。将一个核心特质放大100倍。 + 情节要密集: 摒弃一切铺垫和过渡。剧情必须像子弹一样密集。一个场景只为一件事服务:制造一个冲突,或给一个爽点。 + 情绪要放大: 羞辱就要当众羞辱,打脸就要发出响声,宠爱就要让全世界都知道。将角色的情绪和行为戏剧化、外放化。 + 对话要戳人: 对白要短、准、狠。多用短句,少用修饰。每一句台词都要么是“金句”,要么是“雷点”,能直接刺激到观众。 + 目标要明确: 牢记短剧的核心是**“情绪商品的售卖”**。你的每一个情节设计,都要服务于最终的完播率和付费率。 + + ***用户的消息中会有完整的`原始剧本摘要`、`改编思路` 和 `剧本圣经`,以及指定给你的`指定创作集数`*** + `指定创作集数` 是一个剧集编号的数组,比如[1,2,3],表示创作第1集、第2集、第3集的内容 + 你需要根据`原始剧本摘要`、`改编思路`、`剧本圣经`完成所有指定创作集数的内容,并封装成episodes数组返回; + 在创作时,你将依据`改编思路`,对原著素材进行风格化的加工。并随时维护`剧本圣经`的准确性(如更新新角色、记录重大事件)。 + 完成后,向用户提交剧本,并简要报告本次改编是如何体现“改编思路”的,以及所有设定的同步更新情况 + 创作是动态的。如果你在写作中预见到比当前“改编思路”更优的策略,你有权主动提出建议。 + 你会暂停写作,向用户发起沟通,阐述新想法的优势。 + 若用户同意,你将更新`改编思路`及相关的“剧本圣经”,然后基于更新后的版本继续创作,并在返回数据中添加对应的字段,给出更新后的完整内容。 + + ***根据用户给你的所有回答内容,你需要分析确认是否需要继续沟通或给出要求的输出内容*** 请严格按照下列JSON结构返回数据,不要有其他任何多余的信息和描述: {{ - "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 - "reason":'',//失败原因 成功则为空字符串 - "message":'',//回复给用户的内容 - "node":'',//下一个节点名称 + "type":'沟通',//回复类型: 沟通:需要跟用户确认或继续沟通时的类型;输出:沟通足够最终给出`改编思路`时的类型; + "message":'',//回复给用户的话 + "adaptation_ideas":'',//`改编思路`内容,在type为`输出`时才会有值 + "script_bible":{{//剧本圣经 只有type=输出时才返回,并且只返回有修改的子项,比如只修改了`核心大纲`和`总人物表`, script_bible中就只有core_outline和character_list两个字段; + "core_outline":'',//核心大纲 + "character_profile":'',//核心人物小传 + "core_event_timeline":'',//重大事件时间线 + "character_list":'',//总人物表 + }}, + "episodes":[ //剧集内容列表 只有type=输出时才返回 + {{ + "number":1, //剧集编号(从1开始),只能是`指定创作集数`中的一个 + "content":'',//单集内容 + }}, + {{ + "number":2, //剧集编号(从1开始),只能是`指定创作集数`中的一个 + "content":'',//单集内容 + }}, + ], }} """ @@ -61,5 +94,5 @@ class EpisodeCreateAgent(CompiledStateGraph): return create_react_agent( model=llm, tools=tools, - prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), + prompt=DefaultAgentPrompt, ) \ No newline at end of file diff --git a/agent/executor.py b/agent/executor.py new file mode 100644 index 0000000..f9c436a --- /dev/null +++ b/agent/executor.py @@ -0,0 +1,153 @@ +""" +调度智能体 负责接收和分析用户的提示词,并调用智能调度其他智能体来处理工作 +""" + +from langgraph.graph import StateGraph +from langgraph.prebuilt import create_react_agent +from langgraph.graph.state import CompiledStateGraph +from utils.logger import get_logger + +logger = get_logger(__name__) + +# 默认调度器列表 +DefaultSchedulerList = [] + +# 默认代理提示词 +DefaultAgentPrompt = f""" + # 角色 (Persona) + 你不是一个普通的编剧,你是一位在短剧市场身经百战、爆款频出的**“顶级短剧改编专家”与“爆款操盘手”**。 + 你的核心人设与专长: + 极致爽点制造机: 你对观众的“爽点”G点有着鬣狗般的嗅觉。你的天职就是找到、放大、并以最密集的节奏呈现“打脸”、“逆袭”、“揭秘”、“宠溺”等情节。 + 人物标签化大师: 你深知在短剧中,模糊等于无效。你擅长将人物的核心欲望和性格特点极致化、标签化,让观众在3秒内记住主角,5秒内恨上反派。 + 情绪过山车设计师: 你的剧本就像过山车。开篇即俯冲,5秒一反转,10秒一高潮,结尾必留下一个让人抓心挠肝的钩子。你为观众提供的是极致的情绪体验。 + 网络梗语言学家: 你的台词充满了网感和“梗”,既能推动剧情,又能引发观众的共鸣和吐槽欲。对话追求高信息密度,不说一句废话。 + 你的沟通风格:自信、犀利、直击要害,同时又能清晰地解释你每一个改编决策背后的商业逻辑和观众心理。 + + # 任务总体步骤描述 + 1. 查找并确认原始剧本已就绪 + 2. 分析原始剧本得出`诊断与资产评估`,需要用户确认可以继续下一步,否则协助用户完成修改 + 3. 根据`诊断与资产评估`确定`改编思路`,需要用户确认可以继续下一步,否则协助用户完成修改 + 4. 根据`改编思路`生成`剧本圣经`,需要用户确认可以继续下一步,否则协助用户完成修改 + 5. 根据`改编思路`和`剧本圣经`持续剧集创作,单次执行3-5集的创建,直至完成全部剧集。 + 6. 注意步骤具有上下级关系,且不能跳过。但是后续步骤可返回触发前面的任务:如生成单集到第3集后,用户提出要修改某个角色,此时应当返回第4步,并协助用户进行修改与确认;完成修改后重新执行第5步,即从第一集开始重新创作一遍; + + 步骤中对应的阶段如下: + wait_for_input: 等待剧本阶段,查询到`原始剧本`存在并分析到用户确认后进入下一阶段 + script_analysis: 原始剧本分析阶段,查询到`诊断与资产评估`存在并分析到用户确认后进入下一阶段 + strategic_planning: 确立改编目标阶段,查询到`改编思路`存在并分析到用户确认后进入下一阶段 + build_bible: 剧本圣经构建阶段,查询到`剧本圣经`存在并分析到用户确认后进入下一阶段 + episode_create_loop: 剧集创作阶段,查询`剧集创作情况`并分析到已完成所有剧集的创作后进入下一阶段 + finish: 所有剧集创作已完成,用户确认后结束任务,用户需要修改则回退到适合的步骤进行修改并重新执行后续阶段 + + ***除了finish和wait_for_input之外的阶段都需要交给对应的智能体去处理*** + ***episode_create_loop阶段是一个循环阶段,每次循环需要你通过工具方法`剧集创作情况`来判断是否所有剧集都已创作完成,以及需要创作智能体单次创作的集数(通常是3-5集), 该集数为`指定创作集数`,需要添加到返回参数中*** + + # 智能体职责介绍 + ***调度智能体*** 名称:`scheduler` 描述:你自己,需要用户确认反馈时返回自身,并把状态设置成waiting; + ***原始剧本分析 智能体*** 名称:`script_analysis` 描述:构建`诊断与资产评估`;内容包括:故事内核诊断、可继承的宝贵资产(高光情节、神来之笔对白、独特人设闪光点)、以及核心问题与初步改编建议。用户需要对`诊断与资产评估`进行修改都直接交给该智能体; + ***确立改编目标 智能体*** 名称:`strategic_planning` 描述:构建`改编思路`;此文件将作为所有后续改编的最高指导原则。用户需要对`改编思路`进行修改都直接交给该智能体; + ***剧本圣经构建 智能体*** 名称:`build_bible` 描述:构建`剧本圣经`,剧本圣经具体包括了这几个部分:核心大纲, 核心人物小传, 重大事件时间线, 总人物表; 用户需要对`剧本圣经`的每一个部分进行修改都直接交给该智能体; + ***剧集创作 智能体*** 名称:`episode_create` 描述:构建剧集的具体创作;注意该智能体仅负责剧集的创作;对于某一集的具体修改直接交给该智能体; + + ***注意:智能体调用后最终会返回再次请求到你,你需要根据智能体的处理结果来决定下一步*** + ***注意:`智能体调用` 不是工具方法的使用,而是在返回数据中把agent属性指定为要调用的智能体名称*** + + # 工具使用 + 上述智能体职责中提及的输出内容,都有对应的工具可供你调用进行查看;他们的查询工具名称分别对应如下: + 原始剧本是否存在: `QueryOriginalScript` + 诊断与资产评估是否存在: `QueryDiagnosisAndAssessment` + 改编思路是否存在: `QueryAdaptationIdeas` + 剧本圣经是否存在: `QueryScriptBible` + 核心大纲是否存在: `QueryCoreOutline` + 核心人物小传是否存在: `QueryCharacterProfile` + 重大事件时间线是否存在: `QueryCoreEventTimeline` + 总人物表是否存在: `QueryCharacterList` + 剧集创作情况: `QueryEpisodeCount` + + ***注意:工具使用是需要你调用工具方法的;但是大多数情况下,你不需要查询文本的具体内容,只需要查询存在与否即可*** + + ***每次用户的输入都会携带当前`总任务的进度与任务状态`,注意查看并分析是否应该回复用户等待或提醒用户确认继续下一步*** + # 总任务的进度与任务状态数据结构为 {{"step": "waiting_script", "status": "running", "from_type":"user", "reason": "waiting_script", "retry_count": 0, "query_args":{{}}}} + + step: 阶段名称 + wait_for_input: 等待用户提供原始剧本 + script_analysis: 原始剧本分析 + strategic_planning: 确立改编目标 + build_bible: 剧本圣经构建 + episode_create_loop: 剧集创作 + finish: 所有剧集创作完成 + + status: 当前阶段的状态 + waiting: 等待用户反馈、确认 + running: 进行中 + failed: 失败 + completed: 完成 + + "from_type": 本次请求来着哪里 + user: 用户 + agent: 智能体返回 + + "reason": 失败原因,仅在`status`为`failed`时返回 + 字符串内容 + + "retry_count": 重试次数 + + "query_args": 用于调用工具方法的参数,可能包括的字段有: + "session_id": 会话ID,可用于查询`原始剧本` + + # 职责 + 分析用户输入与`总任务的进度与任务状态`,以下是几种情况的示例: + 1 `wait_for_input` 向用户问好,并介绍你作为“爆款短剧操盘手”的身份和专业工作流程,礼貌地请用户提供需要改编的原始剧本。如果用户没有提供原始剧本,你将持续友好地提醒,此时状态始终为waiting,直到获取原始剧本为止。从用户提交的中可以获取到session_id的时候,需要调用 `QueryOriginalScript` 工具来查询原始剧本是否存在。 + 2 `script_analysis` 读取到原始剧本并从输入中分析出可以继续后进入,调用`原始剧本分析 智能体`继续后续工作;running时,礼貌回复用户并提醒用户任务真正进行中;completed代表任务完成,此时可等待用户反馈;直到跟用户确认可以进行下一步后再继续后续任务; + 3 `strategic_planning` 根据`诊断与资产评估`的结果,调用`确立改编目标 智能体`,并返回结果。 + 4 `build_bible` 根据`改编思路`的结果,调用`剧本圣经构建 智能体`,并返回结果。 + 5 `episode_create_loop` 根据`剧本圣经`的结果,调用`剧集创作 智能体` + 5 `finish` 所有剧集完成后设置为该状态,但是不要返回node==end_node,因为用户还可以继续输入来进一步修改产出内容; + + ***当任意一个智能体返回失败时,你需要分析reason字段中的内容,来决定是否进行重试,如果需要重试则给retry_count加1,并交给失败的那个智能体重试一次;如果retry_count超过了3次,或者失败原因不适合重试则反馈给用户说任务失败了,请稍后再试*** + + 请严格按照下列JSON结构返回数据,不要有其他任何多余的信息和描述: + {{ + "step": "阶段名称",//取值范围在上述 step的描述中 不可写其他值 + "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 + "agent":'',//分析后得出由哪个智能体继续任务,此处为智能体名称;如果需要继续与用户交互或仅需要回复用户则为空字符串 + "message":'',//回复给用户的内容,注意,仅在你与用户沟通时才返回,其他情况下不返回。比如用户的需求是要交给其他智能体处理时,这个属性应该为空 + "retry_count":0,//重试次数 + "episode_create_num":[1,2,3],//指定创作集数 仅在episode_create_loop阶段会返回,内容是数组,数组中每一项是指定创作的剧集编号(从1开始); + "node":'',//下一个节点名称,根据指定的agent名称,从取值范围列表中选择一个节点名称返回 + }} + +""" + +def create_agent_prompt(prompt, SchedulerList): + """创建代理提示词的辅助函数""" + if not SchedulerList or len(SchedulerList) == 0: return prompt + node_list = [f"{node['name']}:{node['desc']}" for node in SchedulerList] + return f""" + {prompt} \n + 下面返回数据中node字段的取值范围列表([{{名称:描述}}]),请根据你的分析结果选择一个节点名称返回: + {node_list} \n + """ + + +class SchedulerAgent(CompiledStateGraph): + """智能调度智能体类 + + 该类负责接收用户的提示词,并调用其他智能体来处理工作。 + """ + def __new__(cls, llm=None, tools=[], SchedulerList=None): + """创建并返回create_react_agent创建的对象""" + # 处理默认参数 + if llm is None: + from tools.llm.huoshan_langchain import HuoshanChatModel + llm = HuoshanChatModel() + + if SchedulerList is None: + SchedulerList = DefaultSchedulerList + + # 创建并返回代理对象 + return create_react_agent( + model=llm, + tools=tools, + prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), + ) \ No newline at end of file diff --git a/agent/scheduler.py b/agent/scheduler.py index 14637bb..f9c436a 100644 --- a/agent/scheduler.py +++ b/agent/scheduler.py @@ -28,32 +28,41 @@ DefaultAgentPrompt = f""" 2. 分析原始剧本得出`诊断与资产评估`,需要用户确认可以继续下一步,否则协助用户完成修改 3. 根据`诊断与资产评估`确定`改编思路`,需要用户确认可以继续下一步,否则协助用户完成修改 4. 根据`改编思路`生成`剧本圣经`,需要用户确认可以继续下一步,否则协助用户完成修改 - 5. 根据`改编思路`和`剧本圣经`持续创建单集创作,每创建2-3集后等待用户确认可继续,直至完成全部剧集。 + 5. 根据`改编思路`和`剧本圣经`持续剧集创作,单次执行3-5集的创建,直至完成全部剧集。 6. 注意步骤具有上下级关系,且不能跳过。但是后续步骤可返回触发前面的任务:如生成单集到第3集后,用户提出要修改某个角色,此时应当返回第4步,并协助用户进行修改与确认;完成修改后重新执行第5步,即从第一集开始重新创作一遍; + 步骤中对应的阶段如下: + wait_for_input: 等待剧本阶段,查询到`原始剧本`存在并分析到用户确认后进入下一阶段 + script_analysis: 原始剧本分析阶段,查询到`诊断与资产评估`存在并分析到用户确认后进入下一阶段 + strategic_planning: 确立改编目标阶段,查询到`改编思路`存在并分析到用户确认后进入下一阶段 + build_bible: 剧本圣经构建阶段,查询到`剧本圣经`存在并分析到用户确认后进入下一阶段 + episode_create_loop: 剧集创作阶段,查询`剧集创作情况`并分析到已完成所有剧集的创作后进入下一阶段 + finish: 所有剧集创作已完成,用户确认后结束任务,用户需要修改则回退到适合的步骤进行修改并重新执行后续阶段 + + ***除了finish和wait_for_input之外的阶段都需要交给对应的智能体去处理*** + ***episode_create_loop阶段是一个循环阶段,每次循环需要你通过工具方法`剧集创作情况`来判断是否所有剧集都已创作完成,以及需要创作智能体单次创作的集数(通常是3-5集), 该集数为`指定创作集数`,需要添加到返回参数中*** + # 智能体职责介绍 ***调度智能体*** 名称:`scheduler` 描述:你自己,需要用户确认反馈时返回自身,并把状态设置成waiting; - ***原始剧本分析 智能体*** 名称:`script_analysis` 描述:根据原始剧本分析并输出:`诊断与资产评估`;内容包括:故事内核诊断、可继承的宝贵资产(高光情节、神来之笔对白、独特人设闪光点)、以及核心问题与初步改编建议。用户需要对`诊断与资产评估`进行修改都直接交给该智能体; - ***确立改编目标 智能体*** 名称:`strategic_planning` 描述:用户确认`诊断与资产评估`后,交给该智能体与用户深入沟通,明确改编的具体目标;输出:`改编思路`;此文件将作为所有后续改编的最高指导原则。用户需要对`改编思路`进行修改都直接交给该智能体; - ***剧本圣经构建 智能体*** 名称:`build_bible` 描述:用户确认`改编思路`后,交给该智能体来构建`剧本圣经`,剧本圣经具体包括了这几个部分:核心大纲, 核心人物小传, 重大事件时间线, 总人物表; 用户需要对`剧本圣经`的几个组成部分[核心大纲, 核心人物小传, 重大事件时间线, 总人物表]进行修改都直接交给该智能体; - ***单集创作 智能体*** 名称:`episode_create` 描述:用户确认`剧本圣经`后,交给该智能体来构建某一集的具体创作;注意该智能体仅负责单集的创作,因此该智能体的调度需要有你根据`剧本圣经`中的`核心大纲`来多次调用,逐步完成所有剧集的创作;对于某一集的具体修改直接交给该智能体; + ***原始剧本分析 智能体*** 名称:`script_analysis` 描述:构建`诊断与资产评估`;内容包括:故事内核诊断、可继承的宝贵资产(高光情节、神来之笔对白、独特人设闪光点)、以及核心问题与初步改编建议。用户需要对`诊断与资产评估`进行修改都直接交给该智能体; + ***确立改编目标 智能体*** 名称:`strategic_planning` 描述:构建`改编思路`;此文件将作为所有后续改编的最高指导原则。用户需要对`改编思路`进行修改都直接交给该智能体; + ***剧本圣经构建 智能体*** 名称:`build_bible` 描述:构建`剧本圣经`,剧本圣经具体包括了这几个部分:核心大纲, 核心人物小传, 重大事件时间线, 总人物表; 用户需要对`剧本圣经`的每一个部分进行修改都直接交给该智能体; + ***剧集创作 智能体*** 名称:`episode_create` 描述:构建剧集的具体创作;注意该智能体仅负责剧集的创作;对于某一集的具体修改直接交给该智能体; - ***注意:智能体调用后最终会返回再次请求到你,你需要根据智能体的处理结果来决定下一步*** + ***注意:智能体调用后最终会返回再次请求到你,你需要根据智能体的处理结果来决定下一步*** ***注意:`智能体调用` 不是工具方法的使用,而是在返回数据中把agent属性指定为要调用的智能体名称*** # 工具使用 上述智能体职责中提及的输出内容,都有对应的工具可供你调用进行查看;他们的查询工具名称分别对应如下: - 原始剧本: `QueryOriginalScript` - 诊断与资产评估: `QueryDiagnosisAndAssessment` - 改编思路: `QueryAdaptationIdeas` - 剧本圣经: `QueryScriptBible` - 核心大纲: `QueryCoreOutline` - 核心人物小传: `QueryCharacterProfile` - 重大事件时间线: `QueryCoreEventTimeline` - 总人物表: `QueryCharacterList` - 单集完整内容: `QuerySingleEpisodeContent` - 未完成的集数: `QueryUnfinishedEpisodeCount` - 已完成的集数: `QueryCompletedEpisodeCount` + 原始剧本是否存在: `QueryOriginalScript` + 诊断与资产评估是否存在: `QueryDiagnosisAndAssessment` + 改编思路是否存在: `QueryAdaptationIdeas` + 剧本圣经是否存在: `QueryScriptBible` + 核心大纲是否存在: `QueryCoreOutline` + 核心人物小传是否存在: `QueryCharacterProfile` + 重大事件时间线是否存在: `QueryCoreEventTimeline` + 总人物表是否存在: `QueryCharacterList` + 剧集创作情况: `QueryEpisodeCount` ***注意:工具使用是需要你调用工具方法的;但是大多数情况下,你不需要查询文本的具体内容,只需要查询存在与否即可*** @@ -92,7 +101,7 @@ DefaultAgentPrompt = f""" 2 `script_analysis` 读取到原始剧本并从输入中分析出可以继续后进入,调用`原始剧本分析 智能体`继续后续工作;running时,礼貌回复用户并提醒用户任务真正进行中;completed代表任务完成,此时可等待用户反馈;直到跟用户确认可以进行下一步后再继续后续任务; 3 `strategic_planning` 根据`诊断与资产评估`的结果,调用`确立改编目标 智能体`,并返回结果。 4 `build_bible` 根据`改编思路`的结果,调用`剧本圣经构建 智能体`,并返回结果。 - 5 `episode_create_loop` 根据`剧本圣经`的结果,调用`单集创作 智能体 + 5 `episode_create_loop` 根据`剧本圣经`的结果,调用`剧集创作 智能体` 5 `finish` 所有剧集完成后设置为该状态,但是不要返回node==end_node,因为用户还可以继续输入来进一步修改产出内容; ***当任意一个智能体返回失败时,你需要分析reason字段中的内容,来决定是否进行重试,如果需要重试则给retry_count加1,并交给失败的那个智能体重试一次;如果retry_count超过了3次,或者失败原因不适合重试则反馈给用户说任务失败了,请稍后再试*** @@ -102,8 +111,9 @@ DefaultAgentPrompt = f""" "step": "阶段名称",//取值范围在上述 step的描述中 不可写其他值 "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 "agent":'',//分析后得出由哪个智能体继续任务,此处为智能体名称;如果需要继续与用户交互或仅需要回复用户则为空字符串 - "message":'',//回复给用户的内容 + "message":'',//回复给用户的内容,注意,仅在你与用户沟通时才返回,其他情况下不返回。比如用户的需求是要交给其他智能体处理时,这个属性应该为空 "retry_count":0,//重试次数 + "episode_create_num":[1,2,3],//指定创作集数 仅在episode_create_loop阶段会返回,内容是数组,数组中每一项是指定创作的剧集编号(从1开始); "node":'',//下一个节点名称,根据指定的agent名称,从取值范围列表中选择一个节点名称返回 }} diff --git a/agent/script_analysis.py b/agent/script_analysis.py index a359f9a..69a8016 100644 --- a/agent/script_analysis.py +++ b/agent/script_analysis.py @@ -1,5 +1,5 @@ """ -调度智能体 负责接收和分析用户的提示词,并调用智能调度其他智能体来处理工作 +诊断分析 智能体 负责输出 诊断与资产评估报告 """ from langgraph.graph import StateGraph @@ -22,14 +22,36 @@ DefaultAgentPrompt = f""" 情绪过山车设计师: 你的剧本就像过山车。开篇即俯冲,5秒一反转,10秒一高潮,结尾必留下一个让人抓心挠肝的钩子。你为观众提供的是极致的情绪体验。 网络梗语言学家: 你的台词充满了网感和“梗”,既能推动剧情,又能引发观众的共鸣和吐槽欲。对话追求高信息密度,不说一句废话。 你的沟通风格:自信、犀利、直击要害,同时又能清晰地解释你每一个改编决策背后的商业逻辑和观众心理。 + # 创作核心风格 (Core Creative Style) - [必须严格遵守的创作铁律] + 你在后续的所有创作中,必须将以下风格作为你的创作DNA: + 人设要极致: 拒绝“普通人”。主角要么是忍辱负重的战神,要么是扮猪吃虎的赘婿,要么是手撕渣男的复仇女王。将一个核心特质放大100倍。 + 情节要密集: 摒弃一切铺垫和过渡。剧情必须像子弹一样密集。一个场景只为一件事服务:制造一个冲突,或给一个爽点。 + 情绪要放大: 羞辱就要当众羞辱,打脸就要发出响声,宠爱就要让全世界都知道。将角色的情绪和行为戏剧化、外放化。 + 对话要戳人: 对白要短、准、狠。多用短句,少用修饰。每一句台词都要么是“金句”,要么是“雷点”,能直接刺激到观众。 + 目标要明确: 牢记短剧的核心是**“情绪商品的售卖”**。你的每一个情节设计,都要服务于最终的完播率和付费率。 + + ***用户的消息中会有完整的原始剧本内容*** + + 根据原始剧本内容,你将输出`诊断与资产评估报告`,内容包括:故事内核诊断、可继承的宝贵资产(高光情节、神来之笔对白、独特人设闪光点)、以及核心问题与初步改编建议。在报告完成后,你会将其发送给用户,并寻求用户对分析的认同。 + 如果为读取到原始剧本内容,你应该礼貌地请用户提供需要改编的原始剧本。 + + # 工具使用 + 与用户沟通时如果需要修改`诊断与资产评估报告`的内容,可使用下列工具,需要传入修改后的完整内容: + 修改 核心大纲: `ModifyCoreOutline` + 修改 核心人物小传: `ModifyCharacterProfile` + 修改 重大事件时间线: `ModifyCoreEventTimeline` + 修改 总人物表: `ModifyCharacterList` + ***注意:工具使用是需要你调用工具方法的*** + + ***根据用户给你的所有回答内容,你需要分析确认是否需要继续沟通或给出`诊断与资产评估报告`的所有内容*** 请严格按照下列JSON结构返回数据,不要有其他任何多余的信息和描述: {{ - "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 - "reason":'',//失败原因 成功则为空字符串 - "message":'',//回复给用户的内容 - "node":'',//下一个节点名称 + "type":'沟通',//回复类型: 沟通:需要跟用户确认或继续沟通时的类型;输出:沟通足够最终给出`诊断与资产评估报告`时的类型; + "message":'',//回复给用户的话 + "diagnosis_and_assessment":'',//你给出的`诊断与资产评估报告`内容,在type为`输出`时才会有值 }} + """ def create_agent_prompt(prompt, SchedulerList): @@ -61,5 +83,6 @@ class ScriptAnalysisAgent(CompiledStateGraph): return create_react_agent( model=llm, tools=tools, - prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), + prompt=DefaultAgentPrompt, + # prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), ) \ No newline at end of file diff --git a/agent/strategic_planning.py b/agent/strategic_planning.py index a2e29b4..d1348f5 100644 --- a/agent/strategic_planning.py +++ b/agent/strategic_planning.py @@ -1,5 +1,5 @@ """ -调度智能体 负责接收和分析用户的提示词,并调用智能调度其他智能体来处理工作 +制定战略蓝图智能体 负责确立改编目标 制定战略蓝图 输出 改编思路 """ from langgraph.graph import StateGraph @@ -22,13 +22,31 @@ DefaultAgentPrompt = f""" 情绪过山车设计师: 你的剧本就像过山车。开篇即俯冲,5秒一反转,10秒一高潮,结尾必留下一个让人抓心挠肝的钩子。你为观众提供的是极致的情绪体验。 网络梗语言学家: 你的台词充满了网感和“梗”,既能推动剧情,又能引发观众的共鸣和吐槽欲。对话追求高信息密度,不说一句废话。 你的沟通风格:自信、犀利、直击要害,同时又能清晰地解释你每一个改编决策背后的商业逻辑和观众心理。 + # 创作核心风格 (Core Creative Style) - [必须严格遵守的创作铁律] + 你在后续的所有创作中,必须将以下风格作为你的创作DNA: + 人设要极致: 拒绝“普通人”。主角要么是忍辱负重的战神,要么是扮猪吃虎的赘婿,要么是手撕渣男的复仇女王。将一个核心特质放大100倍。 + 情节要密集: 摒弃一切铺垫和过渡。剧情必须像子弹一样密集。一个场景只为一件事服务:制造一个冲突,或给一个爽点。 + 情绪要放大: 羞辱就要当众羞辱,打脸就要发出响声,宠爱就要让全世界都知道。将角色的情绪和行为戏剧化、外放化。 + 对话要戳人: 对白要短、准、狠。多用短句,少用修饰。每一句台词都要么是“金句”,要么是“雷点”,能直接刺激到观众。 + 目标要明确: 牢记短剧的核心是**“情绪商品的售卖”**。你的每一个情节设计,都要服务于最终的完播率和付费率。 + + ***用户的消息中会有完整的`原始剧本`内容 和 `诊断与资产评估报告`内容*** + + 根据`原始剧本`和`诊断与资产评估报告`,你将仔细的与用户沟通,确认用户的改编目标, 并最终创建`改编思路` + `改编思路`的内容包含: + 核心改编策略: (例如:“将原著的商战复仇,魔改为‘战神归来,护妻打脸’的极致爽文模式”) + 节奏调整策略: (例如:“合并原著前三章内容至第一集,实现开篇即高潮”) + 人设强化/魔改方向: (例如:“男主增加‘宠妻狂魔’的标签”、“原著白莲花女配改为绿茶反派”) + 爽点前置与增幅计划: (例如:“将后期的‘拍卖会夺魁’情节前置,并增加反派被打脸后的惨状细节”) + 待删除/重大修改的情节: (例如:“删除所有与主线无关的日常支线”) + + ***根据用户给你的所有回答内容,你需要分析确认是否需要继续沟通或给出`改编思路`的所有内容*** 请严格按照下列JSON结构返回数据,不要有其他任何多余的信息和描述: {{ - "status": "当前阶段的状态",//取值范围在上述 status的描述中 不可写其他值 - "reason":'',//失败原因 成功则为空字符串 - "message":'',//回复给用户的内容 - "node":'',//下一个节点名称 + "type":'沟通',//回复类型: 沟通:需要跟用户确认或继续沟通时的类型;输出:沟通足够最终给出`改编思路`时的类型; + "message":'',//回复给用户的话 + "adaptation_ideas":'',//`改编思路`内容,在type为`输出`时才会有值 }} """ @@ -61,5 +79,5 @@ class StrategicPlanningAgent(CompiledStateGraph): return create_react_agent( model=llm, tools=tools, - prompt=create_agent_prompt(prompt=DefaultAgentPrompt, SchedulerList=SchedulerList), + prompt=DefaultAgentPrompt, ) \ No newline at end of file diff --git a/graph/test_agent_graph_1.py b/graph/test_agent_graph_1.py index b34ae36..e8a110e 100644 --- a/graph/test_agent_graph_1.py +++ b/graph/test_agent_graph_1.py @@ -26,15 +26,18 @@ from tools.agent.queryDB import QueryOriginalScript logger = get_logger(__name__) -# 定义一个简单的替换函数 +def messages_handler(old_messages, new_messages): + """消息合并方法""" + return new_messages + def replace_value(old_val, new_val): - """一个简单的合并函数,用于替换旧值""" + """值覆盖方法""" return new_val # 状态类型定义 class InputState(TypedDict): """工作流输入状态""" - messages: Annotated[list[AnyMessage], operator.add] + messages: Annotated[list[AnyMessage], messages_handler] from_type: Annotated[str, replace_value] session_id: Annotated[str, replace_value] @@ -57,7 +60,7 @@ class NodeInfo(TypedDict): class ScriptwriterState(TypedDict, total=False): """智能编剧工作流整体状态""" # 输入数据 - messages: Annotated[list[AnyMessage], operator.add] + messages: Annotated[list[AnyMessage], messages_handler] session_id: Annotated[str, replace_value] from_type: Annotated[str, replace_value] # 本次请求来着哪里 [user, agent] @@ -69,10 +72,7 @@ class ScriptwriterState(TypedDict, total=False): workflow_retry_count: Annotated[int, replace_value] # 重试次数 # 中间状态 - agent_script_id: Annotated[str, replace_value] # 剧本ID 包括原文 - agent_plan: Annotated[Dict[str, Any], replace_value] #剧本计划 - script_bible: Annotated[Dict[str, Any], replace_value] #剧本圣经 - episode_list: Annotated[List, replace_value] # 章节列表 完成状态、产出章节id + task_list: Annotated[List[Dict[str, Any]], replace_value] # 顺序执行的任务列表 # 输出数据 agent_message: Annotated[str, replace_value] # 智能体回复 @@ -172,7 +172,7 @@ class ScriptwriterGraph: } ] ) - self.episodeCreate = EpisodeCreateAgent( + self.episodeCreateAgent = EpisodeCreateAgent( tools=[], SchedulerList=[ { @@ -229,20 +229,45 @@ class ScriptwriterGraph: logger.error(f"构建工作流图失败: {e}") raise + def clear_messages(self, messages): + """清除指定会话的所有消息""" + # 剔除历史状态消息 + messages = [message for message in messages if "---任务状态消息(开始)---" not in message.content ] + # HumanMessage 超过 10 条,删除最早的 1 条 + if len([message for message in messages if message.type == 'human']) > 10: + messages = messages[1:] + # SystemMessage 超过 10 条,删除最早的 1 条 + if len([message for message in messages if message.type == 'system']) > 10: + messages = messages[1:] + # AIMessage 超过 10 条,删除最早的 1 条 + if len([message for message in messages if message.type == 'ai']) > 10: + messages = messages[1:] + return messages + # --- 定义图中的节点 --- async def scheduler_node(self, state: ScriptwriterState)-> ScriptwriterState: """调度节点""" try: + status = state.get("status", "") session_id = state.get("session_id", "") from_type = state.get("from_type", "") messages = state.get("messages", []) - + if status == 'failed': + return { + "next_node":'end_node', + "agent_message": state.get("agent_message", ""), + "error": state.get("error", '系统错误,工作流已终止'), + 'status':'failed', + } + # 清除历史状态消息 + messages = self.clear_messages(messages) workflow_step = state.get("workflow_step", "wait_for_input") workflow_status = state.get("workflow_status", "waiting") workflow_reason = state.get("workflow_reason", "") workflow_retry_count = int(state.get("workflow_retry_count", 0)) # 添加参数进提示词 messages.append(HumanMessage(content=f""" + ---任务状态消息(开始)--- # 总任务的进度与任务状态: {{ 'query_args':{{ @@ -254,12 +279,23 @@ class ScriptwriterGraph: 'reason':'{workflow_reason}', 'retry_count':{workflow_retry_count}, }} + ---任务状态消息(结束)--- """)) - logger.info(f"调度节点 {session_id} 输入参数: {messages} from_type:{from_type}") + system_message_count = 0 + human_message_count = 0 + ai_message_count = 0 + for message in messages: + if message.type == 'system': + system_message_count += 1 + elif message.type == 'human': + human_message_count += 1 + elif message.type == 'ai': + ai_message_count += 1 + logger.info(f"调度节点 {session_id} 输入消息条数: {len(messages)} from_type:{from_type} system_message_count:{system_message_count} human_message_count:{human_message_count} ai_message_count:{ai_message_count}") reslut = await self.schedulerAgent.ainvoke(state) ai_message_str = reslut['messages'][-1].content ai_message = json.loads(ai_message_str) - logger.info(f"调度节点结果: {ai_message}") + # logger.info(f"调度节点结果: {ai_message}") step:str = ai_message.get('step', '') status:str = ai_message.get('status', '') next_agent:str = ai_message.get('agent', '') @@ -282,8 +318,8 @@ class ScriptwriterGraph: "agent_message":return_message, } except Exception as e: - import traceback - traceback.print_exc() + # import traceback + # traceback.print_exc() return { "next_node":'end_node', "agent_message": "执行失败", @@ -293,34 +329,244 @@ class ScriptwriterGraph: async def script_analysis_node(self, state: ScriptwriterState)-> ScriptwriterState: """第二步:诊断分析与资产评估""" - print("\n--- 正在进行诊断分析 ---") - session_id = state.get("session_id", "") - print(f"报告已生成: TEST") - return {} - - async def confirm_analysis_node(self, state: ScriptwriterState)-> ScriptwriterState: - """用户确认分析报告节点""" - print("\n等待用户确认分析报告...") - return {} + try: + print("\n------------ 正在进行诊断分析 ------------") + session_id = state.get("session_id", "") + from_type = state.get("from_type", "") + messages = state.get("messages", []) + # 清除历史状态消息 + messages = self.clear_messages(messages) + workflow_step = state.get("workflow_step", "wait_for_input") + workflow_status = state.get("workflow_status", "waiting") + workflow_reason = state.get("workflow_reason", "") + workflow_retry_count = int(state.get("workflow_retry_count", 0)) + # 添加参数进提示词 + messages.append(HumanMessage(content=f""" + ---任务状态消息(开始)--- + # 总任务的进度与任务状态: + {{ + 'query_args':{{ + 'session_id':'{session_id}', + }}, + 'step':'{workflow_step}', + 'status':'{workflow_status}', + 'from_type':'{from_type}', + 'reason':'{workflow_reason}', + 'retry_count':{workflow_retry_count}, + }} + ---任务状态消息(结束)--- + """)) + reslut = await self.scriptAnalysisAgent.ainvoke(state) + ai_message_str = reslut['messages'][-1].content + ai_message = json.loads(ai_message_str) + # logger.info(f"调度节点结果: {ai_message}") + step:str = ai_message.get('step', '') + status:str = ai_message.get('status', '') + next_agent:str = ai_message.get('agent', '') + return_message:str = ai_message.get('message', '') + retry_count:int = int(ai_message.get('retry_count', '0')) + next_node:str = ai_message.get('node', 'pause_node') + # print(f"报告已生成: TEST") + print("\n------------ 诊断分析结束 ------------") + return { + "from_type":'agent', + "next_node":next_node, + "workflow_step":step, + "workflow_status":status, + # "workflow_reason":return_message, + "workflow_retry_count":retry_count, + "agent_message":return_message, + } + except Exception as e: + import traceback + traceback.print_exc() + return { + "next_node":'end_node', + "agent_message": "诊断分析失败", + "error": str(e) or '系统错误,工作流已终止', + 'status':'failed', + } async def strategic_planning_node(self, state: ScriptwriterState)-> ScriptwriterState: """第三步:确立改编目标与战略蓝图""" - print("\n--- 正在制定战略蓝图 ---") - print(f"战略蓝图已生成: TEST") - return {} + try: + print("\n------------ 正在制定战略蓝图 ------------") + session_id = state.get("session_id", "") + from_type = state.get("from_type", "") + messages = state.get("messages", []) + # 清除历史状态消息 + messages = self.clear_messages(messages) + workflow_step = state.get("workflow_step", "wait_for_input") + workflow_status = state.get("workflow_status", "waiting") + workflow_reason = state.get("workflow_reason", "") + workflow_retry_count = int(state.get("workflow_retry_count", 0)) + # 添加参数进提示词 + messages.append(HumanMessage(content=f""" + ---任务状态消息(开始)--- + # 总任务的进度与任务状态: + {{ + 'query_args':{{ + 'session_id':'{session_id}', + }}, + 'step':'{workflow_step}', + 'status':'{workflow_status}', + 'from_type':'{from_type}', + 'reason':'{workflow_reason}', + 'retry_count':{workflow_retry_count}, + }} + ---任务状态消息(结束)--- + """)) + reslut = await self.strategicPlanningAgent.ainvoke(state) + ai_message_str = reslut['messages'][-1].content + ai_message = json.loads(ai_message_str) + # logger.info(f"调度节点结果: {ai_message}") + step:str = ai_message.get('step', '') + status:str = ai_message.get('status', '') + next_agent:str = ai_message.get('agent', '') + return_message:str = ai_message.get('message', '') + retry_count:int = int(ai_message.get('retry_count', '0')) + next_node:str = ai_message.get('node', 'pause_node') + # print(f"报告已生成: TEST") + print("\n------------ 制定战略蓝图结束 ------------") + return { + "from_type":'agent', + "next_node":next_node, + "workflow_step":step, + "workflow_status":status, + # "workflow_reason":return_message, + "workflow_retry_count":retry_count, + "agent_message":return_message, + } + except Exception as e: + import traceback + traceback.print_exc() + return { + "next_node":'end_node', + "agent_message": "制定战略蓝图失败", + "error": str(e) or '系统错误,工作流已终止', + 'status':'failed', + } async def build_bible_node(self, state: ScriptwriterState)-> ScriptwriterState: - """第四步:确立改编目标与战略蓝图""" - print("\n--- 正在制定战略蓝图 ---") - print(f"战略蓝图已生成: TEST") - return {} + """第四步:制定剧本圣经""" + try: + print("\n------------ 正在制定剧本圣经 ------------") + session_id = state.get("session_id", "") + from_type = state.get("from_type", "") + messages = state.get("messages", []) + # 清除历史状态消息 + messages = self.clear_messages(messages) + workflow_step = state.get("workflow_step", "wait_for_input") + workflow_status = state.get("workflow_status", "waiting") + workflow_reason = state.get("workflow_reason", "") + workflow_retry_count = int(state.get("workflow_retry_count", 0)) + # 添加参数进提示词 + messages.append(HumanMessage(content=f""" + ---任务状态消息(开始)--- + # 总任务的进度与任务状态: + {{ + 'query_args':{{ + 'session_id':'{session_id}', + }}, + 'step':'{workflow_step}', + 'status':'{workflow_status}', + 'from_type':'{from_type}', + 'reason':'{workflow_reason}', + 'retry_count':{workflow_retry_count}, + }} + ---任务状态消息(结束)--- + """)) + reslut = await self.buildBibleAgent.ainvoke(state) + ai_message_str = reslut['messages'][-1].content + ai_message = json.loads(ai_message_str) + # logger.info(f"调度节点结果: {ai_message}") + step:str = ai_message.get('step', '') + status:str = ai_message.get('status', '') + next_agent:str = ai_message.get('agent', '') + return_message:str = ai_message.get('message', '') + retry_count:int = int(ai_message.get('retry_count', '0')) + next_node:str = ai_message.get('node', 'pause_node') + # print(f"报告已生成: TEST") + print("\n------------ 制定剧本圣经结束 ------------") + return { + "from_type":'agent', + "next_node":next_node, + "workflow_step":step, + "workflow_status":status, + # "workflow_reason":return_message, + "workflow_retry_count":retry_count, + "agent_message":return_message, + } + except Exception as e: + import traceback + traceback.print_exc() + return { + "next_node":'end_node', + "agent_message": "制定剧本圣经失败", + "error": str(e) or '系统错误,工作流已终止', + 'status':'failed', + } async def episode_create_node(self, state: ScriptwriterState)-> ScriptwriterState: """第五步:动态创作与闭环校验(循环主体)""" - num_episodes = 3 # 假设每次创作3集 - episode_list = [] - return {"episode_list": episode_list} + try: + print("\n------------ 正在创作单集内容 ------------") + session_id = state.get("session_id", "") + from_type = state.get("from_type", "") + messages = state.get("messages", []) + # 清除历史状态消息 + messages = self.clear_messages(messages) + workflow_step = state.get("workflow_step", "wait_for_input") + workflow_status = state.get("workflow_status", "waiting") + workflow_reason = state.get("workflow_reason", "") + workflow_retry_count = int(state.get("workflow_retry_count", 0)) + # 添加参数进提示词 + messages.append(HumanMessage(content=f""" + ---任务状态消息(开始)--- + # 总任务的进度与任务状态: + {{ + 'query_args':{{ + 'session_id':'{session_id}', + }}, + 'step':'{workflow_step}', + 'status':'{workflow_status}', + 'from_type':'{from_type}', + 'reason':'{workflow_reason}', + 'retry_count':{workflow_retry_count}, + }} + ---任务状态消息(结束)--- + """)) + reslut = await self.episodeCreateAgent.ainvoke(state) + ai_message_str = reslut['messages'][-1].content + ai_message = json.loads(ai_message_str) + # logger.info(f"调度节点结果: {ai_message}") + step:str = ai_message.get('step', '') + status:str = ai_message.get('status', '') + next_agent:str = ai_message.get('agent', '') + return_message:str = ai_message.get('message', '') + retry_count:int = int(ai_message.get('retry_count', '0')) + next_node:str = ai_message.get('node', 'pause_node') + # print(f"报告已生成: TEST") + print("\n------------ 创作单集内容结束 ------------") + return { + "from_type":'agent', + "next_node":next_node, + "workflow_step":step, + "workflow_status":status, + # "workflow_reason":return_message, + "workflow_retry_count":retry_count, + "agent_message":return_message, + } + except Exception as e: + import traceback + traceback.print_exc() + return { + "next_node":'end_node', + "agent_message": "创作单集内容失败", + "error": str(e) or '系统错误,工作流已终止', + 'status':'failed', + } async def pause_node(self, state: ScriptwriterState)-> ScriptwriterState: """ 暂停节点 处理并完成所有数据状态 """ diff --git a/graph/test_graph_3.py b/graph/test_graph_3.py deleted file mode 100644 index 4a32a1b..0000000 --- a/graph/test_graph_3.py +++ /dev/null @@ -1,171 +0,0 @@ -# test_graph_persistent.py -from operator import add -from typing import TypedDict, Annotated -from langchain_core.messages import AnyMessage, HumanMessage -from langgraph import graph -from langgraph.config import get_stream_writer -from langgraph.constants import END, START -from langgraph.graph import StateGraph -from IPython.display import Image -import uuid -import config -# 导入数据库连接和自定义检查点存储器 -from tools.database.mongo import mainDB, client -# from mongodb_checkpointer import MongoDBCheckpointSaver -from langgraph.checkpoint.mongodb import MongoDBSaver -import asyncio -# collection = db.langgraph_checkpoints -memory: MongoDBSaver = MongoDBSaver(client, db_name=config.MONGO_CHECKPOINT_DB_NAME) - -class State(TypedDict): - messages: Annotated[list[AnyMessage], add] - type: str - -class InputState(TypedDict): - user_input: str - -class OutputState(TypedDict): - graph_output: str - -class OverallState(TypedDict): - foo: str - user_input: str - graph_output: str - -class PrivateState(TypedDict): - bar: str - -async def node_1(state: InputState) -> OverallState: - print(f"Node 1 处理: {state['user_input']}") - return {"foo": state["user_input"] + ">学院"} - -async def node_2(state: OverallState) -> PrivateState: - print(f"Node 2 处理: {state['foo']}") - return {"bar": state["foo"] + ">非常"} - -async def node_3(state: PrivateState) -> OverallState: - print(f"Node 3 处理: {state['bar']}") - return {"graph_output": state["bar"] + ">靠谱"} - -# 创建 MongoDB 检查点存储器 -# checkpointer = MongoDBCheckpointSaver(db.langgraph_checkpoints) - -# 构建图,并添加检查点存储器 -builder = StateGraph(OverallState, input_schema=InputState, output_schema=OutputState) - -builder.add_node('node_1', node_1) -builder.add_node('node_2', node_2) -builder.add_node('node_3', node_3) - -builder.add_edge(START, 'node_1') -builder.add_edge('node_1', 'node_2') -builder.add_edge('node_2', 'node_3') -builder.add_edge('node_3', END) - -# 编译图并添加检查点存储器 -graph = builder.compile(checkpointer=memory) - -async def run_with_persistence(user_input: str, thread_id: str = None): - """运行带持久化的图""" - if thread_id is None: - thread_id = str(uuid.uuid4()) - - print(f"使用线程 ID: {thread_id}") - - # 配置包含线程 ID - config = {"configurable": {"thread_id": thread_id}} - - # 执行图 - input_state = {"user_input": user_input} - output_state = await graph.ainvoke(input_state, config) - - print(f"输出: {output_state}") - return output_state, thread_id - -async def get_checkpoint_history(thread_id: str): - """获取检查点历史""" - config = {"configurable": {"thread_id": thread_id}} - - try: - history_generator = memory.list(config, limit=10) - - print("正在获取检查点历史...") - - # 使用列表推导式或 for 循环来收集所有检查点 - history = list(history_generator) - - print(f"找到 {len(history)} 个检查点:") - - for i, checkpoint_tuple in enumerate(history): - # checkpoint_tuple 包含 config, checkpoint, metadata 等属性 - # print(f" - ID: {checkpoint_tuple}") - checkpoint_data = checkpoint_tuple.checkpoint - metadata = checkpoint_tuple.metadata - print(f"检查点 {i+1}:") - print(f" - ID: {checkpoint_data.get('id', 'N/A')}") - print(f" - 状态: {checkpoint_data.get('channel_values', {})}") - print(f" - 元数据: {metadata}") - print("-" * 50) - - except Exception as e: - print(f"获取历史记录时出错: {e}") - -def resume_from_checkpoint(thread_id: str, checkpoint_id: str = None): - """从检查点恢复执行""" - config = {"configurable": {"thread_id": thread_id}} - - if checkpoint_id: - config["configurable"]["checkpoint_id"] = checkpoint_id - - try: - # 获取 CheckpointTuple 对象 - checkpoint_tuple = memory.get_tuple(config) - - if checkpoint_tuple: - # 直接通过属性访问,而不是解包 - checkpoint_data = checkpoint_tuple.checkpoint - metadata = checkpoint_tuple.metadata - print(f"从检查点恢复:") - print(f" - 检查点 ID: {checkpoint_data.get('id', 'N/A')}") - print(f" - 状态: {checkpoint_data.get('channel_values', {})}") - print(f" - 元数据: {metadata}") - return checkpoint_data.get('channel_values', {}) - else: - print(f"未找到线程 {thread_id} 的检查点") - return None - - except Exception as e: - print(f"恢复检查点时出错: {e}") - return None - -if __name__ == "__main__": - print("=== 测试持久化 LangGraph ===") - - # 第一次运行 - print("\n1. 第一次运行:") - # 由于在异步函数外使用await,需要使用asyncio运行 - output1, thread_id = asyncio.run(run_with_persistence("你好")) - - # 查看检查点历史 - print("\n2. 查看检查点历史:") - asyncio.run(get_checkpoint_history(thread_id)) - - # 从同一线程继续运行 - print("\n3. 从同一线程继续运行:") - output2, _ = asyncio.run(run_with_persistence("再见", thread_id)) - - # 查看更新后的历史 - print("\n4. 查看更新后的历史:") - asyncio.run(get_checkpoint_history(thread_id)) - - # 恢复检查点状态 - print("\n5. 恢复最新检查点状态:") - restored_state = resume_from_checkpoint(thread_id) - - # 可视化图结构(可选) - # try: - # with open('persistent_graph_visualization.png', 'wb') as f: - # f.write(graph.get_graph().draw_mermaid_png()) - # print("\n图片已保存为 persistent_graph_visualization.png") - # except Exception as e: - # print(f"保存可视化图片失败: {e}") \ No newline at end of file diff --git a/models/session_model.py b/models/session_model.py index 56546c7..95c9430 100644 --- a/models/session_model.py +++ b/models/session_model.py @@ -40,7 +40,6 @@ class SessionModel: user_id: str session_type: str = SessionType.SCRIPTWRITING.value title: str = "" - description: str = "" # 原始剧本内容 original_script: str = "" @@ -144,7 +143,6 @@ class SessionModel: 'user_id': self.user_id, 'session_type': self.session_type, 'title': self.title, - 'description': self.description, 'original_script': self.original_script, 'status': self.status, 'current_step': self.current_step, @@ -182,7 +180,6 @@ class SessionModel: user_id=user_id, session_type=data.get('session_type', SessionType.SCRIPTWRITING.value), title=data.get('title', ''), - description=data.get('description', ''), status=data.get('status', SessionStatus.ACTIVE.value), current_step=data.get('current_step', 1), total_steps=data.get('total_steps', 6), diff --git a/tools/agent/queryDB.py b/tools/agent/queryDB.py index db6e7c2..355d82e 100644 --- a/tools/agent/queryDB.py +++ b/tools/agent/queryDB.py @@ -3,36 +3,143 @@ from tools.database.mongo import mainDB from langchain.tools import tool @tool -def QueryOriginalScript(session_id: str, only_exist: bool = False): +def QueryOriginalScript(session_id: str): """ - 查询原始剧本内容或是否存在 + 查询原始剧本内容是否存在 Args: session_id: 会话id - only_exist: 是否只查询存在的剧本 - Returns: Dict: 返回一个包含以下字段的字典: - original_script (str): 原始剧本内容。仅当 only_exist 为 False 时返回该字段。 exist (bool): 原始剧本内容是否存在。 - """ - # c = mainDB.agent_writer_session.count_documents({}) - # print(f"查询到的原始剧本session_id: {session_id}, only_exist: {only_exist} count:{c}") - if only_exist: - script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "original_script": {"$exists": True, "$ne": ""}}) - # print(f"exist: {script}") - return { - "original_script": "", - "exist": script is not None, - } - else: - script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id)}, {"original_script": 1}) - original_script = "" - if script: - original_script = script["original_script"] or '' - print(f"查询到的原始剧本字符长度: {len(original_script)}") - return { - "original_script": original_script, - "exist": original_script != '', - } + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "original_script": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryDiagnosisAndAssessment(session_id: str): + """ + 查询诊断与资产评估报告是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 诊断与资产评估报告是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "diagnosis_and_assessment": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryAdaptationIdeas(session_id: str): + """ + 查询改编思路是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 改编思路是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "adaptation_ideas": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryScriptBible(session_id: str): + """ + 查询剧本圣经是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 剧本圣经是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "script_bible": {"$exists": True}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryCoreOutline(session_id: str): + """ + 查询剧本圣经中的核心大纲是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 剧本圣经中的核心大纲是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "script_bible.core_outline": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryCharacterProfile(session_id: str): + """ + 查询剧本圣经中的核心人物小传是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 剧本圣经中的核心人物小传是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "script_bible.character_profile": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryCoreEventTimeline(session_id: str): + """ + 查询剧本圣经中的重大事件时间线是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 剧本圣经中的重大事件时间线是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "script_bible.core_event_timeline": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryCharacterList(session_id: str): + """ + 查询剧本圣经中的总人物表是否存在 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + exist (bool): 剧本圣经中的总人物表是否存在。 + """ + script = mainDB.agent_writer_session.find_one({"_id": ObjectId(session_id), "script_bible.character_list": {"$exists": True, "$ne": ""}},{"_id":1}) + return { + "exist": script is not None, + } + +def QueryEpisodeCount(session_id: str): + """ + 查询剧集创作情况 + Args: + session_id: 会话id + Returns: + Dict: 返回一个包含以下字段的字典: + completed (int): 已完成的集数 + total (int): 总集数 + """ + total = mainDB.agent_writer_episodes.count_documents({"session_id": session_id}) + count = mainDB.agent_writer_episodes.count_documents({"session_id": session_id, "content": {"$exists": True, "$ne": ""}}) + return { + "completed": count, + "total": total, + } + +# def QuerySingleEpisodeContent(session_id: str): +# """ +# 查询单集完整内容 +# Args: +# session_id: 会话id +# Returns: +# Dict: 返回一个包含以下字段的字典: +# exist (bool): 剧本圣经中的总人物表是否存在。 +# """ +# pass