140 lines
3.5 KiB
Python
140 lines
3.5 KiB
Python
from pathlib import Path
|
|
from pprint import pprint
|
|
import os
|
|
import shutil
|
|
import sys
|
|
root_path = Path(__file__).parent.parent
|
|
sys.path.append(str(root_path))
|
|
|
|
from server.knowledge_base.kb_service.base import KBServiceFactory
|
|
from server import get_kb_path, get_doc_path, KnowledgeFile
|
|
from server.knowledge_base.migrate import folder2db, prune_db_docs, prune_folder_files
|
|
|
|
|
|
# setup test knowledge base
|
|
kb_name = "test_kb_for_migrate"
|
|
test_files = {
|
|
"faq.md": str(root_path / "docs" / "faq.md"),
|
|
"install.md": str(root_path / "docs" / "install.md"),
|
|
}
|
|
|
|
|
|
kb_path = get_kb_path(kb_name)
|
|
doc_path = get_doc_path(kb_name)
|
|
|
|
if not os.path.isdir(doc_path):
|
|
os.makedirs(doc_path)
|
|
|
|
for k, v in test_files.items():
|
|
shutil.copy(v, os.path.join(doc_path, k))
|
|
|
|
|
|
def test_recreate_vs():
|
|
folder2db([kb_name], "recreate_vs")
|
|
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
assert kb and kb.exists()
|
|
|
|
files = kb.list_files()
|
|
print(files)
|
|
for name in test_files:
|
|
assert name in files
|
|
path = os.path.join(doc_path, name)
|
|
|
|
# list docs based on file name
|
|
docs = kb.list_docs(file_name=name)
|
|
assert len(docs) > 0
|
|
pprint(docs[0])
|
|
for doc in docs:
|
|
assert doc.metadata["source"] == path
|
|
|
|
# list docs base on metadata
|
|
docs = kb.list_docs(metadata={"source": path})
|
|
assert len(docs) > 0
|
|
|
|
for doc in docs:
|
|
assert doc.metadata["source"] == path
|
|
|
|
|
|
def test_increament():
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
kb.clear_vs()
|
|
assert kb.list_files() == []
|
|
assert kb.list_docs() == []
|
|
|
|
folder2db([kb_name], "increament")
|
|
|
|
files = kb.list_files()
|
|
print(files)
|
|
for f in test_files:
|
|
assert f in files
|
|
|
|
docs = kb.list_docs(file_name=f)
|
|
assert len(docs) > 0
|
|
pprint(docs[0])
|
|
|
|
for doc in docs:
|
|
assert doc.metadata["source"] == os.path.join(doc_path, f)
|
|
|
|
|
|
def test_prune_db():
|
|
del_file, keep_file = list(test_files)[:2]
|
|
os.remove(os.path.join(doc_path, del_file))
|
|
|
|
prune_db_docs([kb_name])
|
|
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
files = kb.list_files()
|
|
print(files)
|
|
assert del_file not in files
|
|
assert keep_file in files
|
|
|
|
docs = kb.list_docs(file_name=del_file)
|
|
assert len(docs) == 0
|
|
|
|
docs = kb.list_docs(file_name=keep_file)
|
|
assert len(docs) > 0
|
|
pprint(docs[0])
|
|
|
|
shutil.copy(test_files[del_file], os.path.join(doc_path, del_file))
|
|
|
|
|
|
def test_prune_folder():
|
|
del_file, keep_file = list(test_files)[:2]
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
|
|
# delete docs for file
|
|
kb.delete_doc(KnowledgeFile(del_file, kb_name))
|
|
files = kb.list_files()
|
|
print(files)
|
|
assert del_file not in files
|
|
assert keep_file in files
|
|
|
|
docs = kb.list_docs(file_name=del_file)
|
|
assert len(docs) == 0
|
|
|
|
docs = kb.list_docs(file_name=keep_file)
|
|
assert len(docs) > 0
|
|
|
|
docs = kb.list_docs(file_name=del_file)
|
|
assert len(docs) == 0
|
|
|
|
assert os.path.isfile(os.path.join(doc_path, del_file))
|
|
|
|
# prune folder
|
|
prune_folder_files([kb_name])
|
|
|
|
# check result
|
|
assert not os.path.isfile(os.path.join(doc_path, del_file))
|
|
assert os.path.isfile(os.path.join(doc_path, keep_file))
|
|
|
|
|
|
def test_drop_kb():
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
kb.drop_kb()
|
|
assert not kb.exists()
|
|
assert not os.path.isdir(kb_path)
|
|
|
|
kb = KBServiceFactory.get_service_by_name(kb_name)
|
|
assert kb is None
|