235 lines
8.5 KiB
Python
235 lines
8.5 KiB
Python
## 单独运行的时候需要添加
|
|
# import sys
|
|
# import os
|
|
# sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
|
|
|
|
import re
|
|
import warnings
|
|
from typing import Dict
|
|
|
|
from langchain.callbacks.manager import (
|
|
AsyncCallbackManagerForChainRun,
|
|
CallbackManagerForChainRun,
|
|
)
|
|
from langchain.chains.llm import LLMChain
|
|
from langchain.pydantic_v1 import Extra, root_validator
|
|
from langchain.schema import BasePromptTemplate
|
|
from langchain.schema.language_model import BaseLanguageModel
|
|
from typing import List, Any, Optional
|
|
from langchain.prompts import PromptTemplate
|
|
import sys
|
|
import os
|
|
import json
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
|
|
from server.chat.knowledge_base_chat import knowledge_base_chat
|
|
from configs import VECTOR_SEARCH_TOP_K, SCORE_THRESHOLD
|
|
|
|
import asyncio
|
|
from server.agent import model_container
|
|
|
|
|
|
async def search_knowledge_base_iter(database: str, query: str):
|
|
response = await knowledge_base_chat(query=query,
|
|
knowledge_base_name=database,
|
|
model_name=model_container.MODEL.model_name,
|
|
temperature=0.01,
|
|
history=[],
|
|
top_k=VECTOR_SEARCH_TOP_K,
|
|
max_tokens=None,
|
|
prompt_name="knowledge_base_chat",
|
|
score_threshold=SCORE_THRESHOLD,
|
|
stream=False)
|
|
|
|
contents = ""
|
|
async for data in response.body_iterator: # 这里的data是一个json字符串
|
|
data = json.loads(data)
|
|
contents += data["answer"]
|
|
docs = data["docs"]
|
|
return contents
|
|
|
|
|
|
_PROMPT_TEMPLATE = """
|
|
用户会提出一个需要你查询知识库的问题,你应该按照我提供的思想进行思考
|
|
Question: ${{用户的问题}}
|
|
这些数据库是你能访问的,冒号之前是他们的名字,冒号之后是他们的功能:
|
|
|
|
{database_names}
|
|
|
|
你的回答格式应该按照下面的内容,请注意,格式内的```text 等标记都必须输出,这是我用来提取答案的标记。
|
|
```text
|
|
${{知识库的名称}}
|
|
```
|
|
```output
|
|
数据库查询的结果
|
|
```
|
|
答案: ${{答案}}
|
|
|
|
现在,这是我的问题:
|
|
问题: {question}
|
|
|
|
"""
|
|
PROMPT = PromptTemplate(
|
|
input_variables=["question", "database_names"],
|
|
template=_PROMPT_TEMPLATE,
|
|
)
|
|
|
|
|
|
class LLMKnowledgeChain(LLMChain):
|
|
llm_chain: LLMChain
|
|
llm: Optional[BaseLanguageModel] = None
|
|
"""[Deprecated] LLM wrapper to use."""
|
|
prompt: BasePromptTemplate = PROMPT
|
|
"""[Deprecated] Prompt to use to translate to python if necessary."""
|
|
database_names: Dict[str, str] = model_container.DATABASE
|
|
input_key: str = "question" #: :meta private:
|
|
output_key: str = "answer" #: :meta private:
|
|
|
|
class Config:
|
|
"""Configuration for this pydantic object."""
|
|
|
|
extra = Extra.forbid
|
|
arbitrary_types_allowed = True
|
|
|
|
@root_validator(pre=True)
|
|
def raise_deprecation(cls, values: Dict) -> Dict:
|
|
if "llm" in values:
|
|
warnings.warn(
|
|
"Directly instantiating an LLMKnowledgeChain with an llm is deprecated. "
|
|
"Please instantiate with llm_chain argument or using the from_llm "
|
|
"class method."
|
|
)
|
|
if "llm_chain" not in values and values["llm"] is not None:
|
|
prompt = values.get("prompt", PROMPT)
|
|
values["llm_chain"] = LLMChain(llm=values["llm"], prompt=prompt)
|
|
return values
|
|
|
|
@property
|
|
def input_keys(self) -> List[str]:
|
|
"""Expect input key.
|
|
|
|
:meta private:
|
|
"""
|
|
return [self.input_key]
|
|
|
|
@property
|
|
def output_keys(self) -> List[str]:
|
|
"""Expect output key.
|
|
|
|
:meta private:
|
|
"""
|
|
return [self.output_key]
|
|
|
|
def _evaluate_expression(self, dataset, query) -> str:
|
|
try:
|
|
output = asyncio.run(search_knowledge_base_iter(dataset, query))
|
|
except Exception as e:
|
|
output = "输入的信息有误或不存在知识库"
|
|
return output
|
|
return output
|
|
|
|
def _process_llm_result(
|
|
self,
|
|
llm_output: str,
|
|
llm_input: str,
|
|
run_manager: CallbackManagerForChainRun
|
|
) -> Dict[str, str]:
|
|
|
|
run_manager.on_text(llm_output, color="green", verbose=self.verbose)
|
|
|
|
llm_output = llm_output.strip()
|
|
text_match = re.search(r"^```text(.*?)```", llm_output, re.DOTALL)
|
|
if text_match:
|
|
database = text_match.group(1).strip()
|
|
output = self._evaluate_expression(database, llm_input)
|
|
run_manager.on_text("\nAnswer: ", verbose=self.verbose)
|
|
run_manager.on_text(output, color="yellow", verbose=self.verbose)
|
|
answer = "Answer: " + output
|
|
elif llm_output.startswith("Answer:"):
|
|
answer = llm_output
|
|
elif "Answer:" in llm_output:
|
|
answer = "Answer: " + llm_output.split("Answer:")[-1]
|
|
else:
|
|
return {self.output_key: f"输入的格式不对: {llm_output}"}
|
|
return {self.output_key: answer}
|
|
|
|
async def _aprocess_llm_result(
|
|
self,
|
|
llm_output: str,
|
|
run_manager: AsyncCallbackManagerForChainRun,
|
|
) -> Dict[str, str]:
|
|
await run_manager.on_text(llm_output, color="green", verbose=self.verbose)
|
|
llm_output = llm_output.strip()
|
|
text_match = re.search(r"^```text(.*?)```", llm_output, re.DOTALL)
|
|
if text_match:
|
|
expression = text_match.group(1)
|
|
output = self._evaluate_expression(expression)
|
|
await run_manager.on_text("\nAnswer: ", verbose=self.verbose)
|
|
await run_manager.on_text(output, color="yellow", verbose=self.verbose)
|
|
answer = "Answer: " + output
|
|
elif llm_output.startswith("Answer:"):
|
|
answer = llm_output
|
|
elif "Answer:" in llm_output:
|
|
answer = "Answer: " + llm_output.split("Answer:")[-1]
|
|
else:
|
|
raise ValueError(f"unknown format from LLM: {llm_output}")
|
|
return {self.output_key: answer}
|
|
|
|
def _call(
|
|
self,
|
|
inputs: Dict[str, str],
|
|
run_manager: Optional[CallbackManagerForChainRun] = None,
|
|
) -> Dict[str, str]:
|
|
_run_manager = run_manager or CallbackManagerForChainRun.get_noop_manager()
|
|
_run_manager.on_text(inputs[self.input_key])
|
|
data_formatted_str = ',\n'.join([f' "{k}":"{v}"' for k, v in self.database_names.items()])
|
|
llm_output = self.llm_chain.predict(
|
|
database_names=data_formatted_str,
|
|
question=inputs[self.input_key],
|
|
stop=["```output"],
|
|
callbacks=_run_manager.get_child(),
|
|
)
|
|
return self._process_llm_result(llm_output, inputs[self.input_key], _run_manager)
|
|
|
|
async def _acall(
|
|
self,
|
|
inputs: Dict[str, str],
|
|
run_manager: Optional[AsyncCallbackManagerForChainRun] = None,
|
|
) -> Dict[str, str]:
|
|
_run_manager = run_manager or AsyncCallbackManagerForChainRun.get_noop_manager()
|
|
await _run_manager.on_text(inputs[self.input_key])
|
|
data_formatted_str = ',\n'.join([f' "{k}":"{v}"' for k, v in self.database_names.items()])
|
|
llm_output = await self.llm_chain.apredict(
|
|
database_names=data_formatted_str,
|
|
question=inputs[self.input_key],
|
|
stop=["```output"],
|
|
callbacks=_run_manager.get_child(),
|
|
)
|
|
return await self._aprocess_llm_result(llm_output, inputs[self.input_key], _run_manager)
|
|
|
|
@property
|
|
def _chain_type(self) -> str:
|
|
return "llm_knowledge_chain"
|
|
|
|
@classmethod
|
|
def from_llm(
|
|
cls,
|
|
llm: BaseLanguageModel,
|
|
prompt: BasePromptTemplate = PROMPT,
|
|
**kwargs: Any,
|
|
):
|
|
llm_chain = LLMChain(llm=llm, prompt=prompt)
|
|
return cls(llm_chain=llm_chain, **kwargs)
|
|
|
|
|
|
def knowledge_search_once(query: str):
|
|
model = model_container.MODEL
|
|
llm_knowledge = LLMKnowledgeChain.from_llm(model, verbose=True, prompt=PROMPT)
|
|
ans = llm_knowledge.run(query)
|
|
return ans
|
|
|
|
|
|
if __name__ == "__main__":
|
|
result = knowledge_search_once("大数据的男女比例")
|
|
print(result)
|