Obsidian Chat (LC)¶
Obsidian Chat using LangChain
import streamlit as st
import os
import json
import hashlib
import tiktoken
from pathlib import Path
Import LangChain
placeholder = st.sidebar.empty()
with placeholder.status("Importing langchain...", expanded=True) as status:
st.write("Importing langchain_openai...")
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
st.write("Importing langchain_chroma...")
from langchain_chroma import Chroma
st.write("Importing langchain_core.documents...")
from langchain_core.documents import Document
st.write("Importing langchain_text_splitters...")
from langchain_text_splitters import RecursiveCharacterTextSplitter
st.write("Importing langchain_core.prompts...")
from langchain_core.prompts import ChatPromptTemplate
st.write("Importing langchain_core.runnables...")
from langchain_core.runnables import RunnablePassthrough
st.write("Importing langchain_core.output_parsers...")
from langchain_core.output_parsers import StrOutputParser
st.write("Importing langchain_community.document_loaders...")
from langchain_community.document_loaders import TextLoader
# status.update(label="Obsidian Chat", state="complete", expanded=False)
placeholder.empty()
Print banner.
st.set_page_config(
page_title="O-Chat"
)
@st.cache_data
def print_banner():
print("""
_____ _____ _ _
| _ | / __ \\ | | |
| | | |______| / \\/ |__ __ _| |_
| | | |______| | | '_ \\ / _` | __|
\\ \\_/ / | \\__/\\ | | | (_| | |_
\\___/ \\____/_| |_|\\__,_|\\__|
""")
return 1
print_banner()
Embeddings and LLM costs
embedding_models = {
"text-embedding-3-small": 0.02,
}
EMBEDDING_MODEL = list(embedding_models.keys())[0]
EMBEDDING_COST_PER_1M = embedding_models[EMBEDDING_MODEL]
index_folder = f"vectors/o-{EMBEDDING_MODEL}"
llm_models = {
"gpt-5.4": (2.50, 15.00),
"gpt-5.4-mini": (0.75, 4.50),
"gpt-5.4-nano": (0.20, 1.25),
"gpt-4o-mini": (0.15, 0.60),
}
LLM_MODEL = list(llm_models.keys())[3]
LLM_INPUT_COST_PER_1M, LLM_OUTPUT_COST_PER_1M = llm_models[LLM_MODEL]
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
llm = ChatOpenAI(model=LLM_MODEL, temperature=0)
Find Obsidian folder, which is the first subfolder within the current folder that has a name ending with “ Book”.
current_folder = os.getcwd()
home_folders = os.listdir(current_folder)
book_folders = [item for item in home_folders if os.path.isdir(os.path.join(current_folder, item)) and item.endswith(" Book")]
if (len(book_folders)==0):
st.error('The folder should contain a subfolder with a name that ends with " Book".')
st.stop()
obsidian_path = book_folders[0]
st.title(obsidian_path)
Helpers¶
- count_tokens(text: str, model: str = 'cl100k_base') int¶
def count_tokens(text: str, model: str = "cl100k_base") -> int:
"""Return the number of tokens for the given text."""
enc = tiktoken.get_encoding(model)
return len(enc.encode(text))
- file_md5(path: str) str¶
def file_md5(path: str) -> str:
"""Return MD5 hex-digest for a file."""
h = hashlib.md5()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
- load_hashes(index_folder: str) dict¶
HASH_FILE = ".file_hashes.json"
def load_hashes(index_folder: str) -> dict:
"""Load the previously stored file-hash map."""
hash_path = os.path.join(index_folder, HASH_FILE)
if os.path.exists(hash_path):
with open(hash_path, "r") as f:
return json.load(f)
return {}
- save_hashes(index_folder: str, hashes: dict)¶
def save_hashes(index_folder: str, hashes: dict):
"""Persist the file-hash map."""
os.makedirs(index_folder, exist_ok=True)
hash_path = os.path.join(index_folder, HASH_FILE)
with open(hash_path, "w") as f:
json.dump(hashes, f, indent=2)
- detect_changes(obsidian_path: str, index_folder: str)¶
def detect_changes(obsidian_path: str, index_folder: str):
"""Compare current markdown files against stored hashes.
Returns:
changed_files: list of file paths that are new or modified
deleted_files: list of file paths that no longer exist
current_hashes: full hash map for the current state
"""
old_hashes = load_hashes(index_folder)
current_hashes: dict[str, str] = {}
md_files = list(Path(obsidian_path).rglob("*.md"))
for p in md_files:
fp = str(p)
current_hashes[fp] = file_md5(fp)
changed_files = [
fp for fp, h in current_hashes.items()
if old_hashes.get(fp) != h
]
deleted_files = [
fp for fp in old_hashes
if fp not in current_hashes
]
return changed_files, deleted_files, current_hashes
Core pipeline¶
- load_and_split(file_paths: list[str]) list¶
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200
def load_and_split(file_paths: list[str]) -> list:
"""Load markdown files and split into chunks."""
documents: list[Document] = []
for fp in file_paths:
try:
loader = TextLoader(fp, encoding="utf-8")
documents.extend(loader.load())
except Exception as e:
st.warning(f" ⚠ Skipping {fp}: {e}")
splitter = RecursiveCharacterTextSplitter(
chunk_size=CHUNK_SIZE,
chunk_overlap=CHUNK_OVERLAP,
)
return splitter.split_documents(documents)
- update_index(obsidian_path: str, index_folder: str, embeddings) Chroma¶
def update_index(obsidian_path: str, index_folder: str, embeddings) -> Chroma:
"""Create or incrementally update the Chroma vector store."""
changed, deleted, current_hashes = detect_changes(obsidian_path, index_folder)
# Open (or create) the persistent store
vectorstore = Chroma(
persist_directory=index_folder,
embedding_function=embeddings,
collection_name="obsidian_docs",
)
if not changed and not deleted:
st.sidebar.write("✓ Index is up-to-date — no files changed.")
return vectorstore
# --- handle deletions ---------------------------------------------------
if deleted:
st.sidebar.write(f" Removing {len(deleted)} deleted file(s) from index …")
existing = vectorstore.get()
ids_to_delete = [
doc_id
for doc_id, meta in zip(existing["ids"], existing["metadatas"])
if meta.get("source") in deleted
]
if ids_to_delete:
vectorstore.delete(ids=ids_to_delete)
# --- handle new / modified files ----------------------------------------
if changed:
st.sidebar.write(f" Indexing {len(changed)} new/modified file(s) …")
# Remove old chunks for modified files first
existing = vectorstore.get()
ids_to_delete = [
doc_id
for doc_id, meta in zip(existing["ids"], existing["metadatas"])
if meta.get("source") in changed
]
if ids_to_delete:
vectorstore.delete(ids=ids_to_delete)
chunks = load_and_split(changed)
if chunks:
# --- Embedding cost estimation ----------------------------------
total_embed_tokens = sum(count_tokens(c.page_content) for c in chunks)
embed_cost = (total_embed_tokens / 1000000) * EMBEDDING_COST_PER_1M
st.sidebar.write(f" Embedding tokens: {total_embed_tokens:,} "
f"(est. cost ${embed_cost*100:.6f} cents)")
vectorstore.add_documents(chunks)
save_hashes(index_folder, current_hashes)
st.sidebar.write("✓ Index updated.")
return vectorstore
- ask(question: str, vectorstore: Chroma, llm) str¶
def ask(question: str, vectorstore: Chroma, llm) -> str:
"""Run a RAG query and print cost estimates."""
retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
template = """Answer the question based only on the following context.
If you cannot answer from the context, say so.
Context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(template)
def format_docs(docs):
return "\n\n---\n\n".join(d.page_content for d in docs)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
# Retrieve docs to estimate input tokens
retrieved_docs = retriever.invoke(question)
context_text = format_docs(retrieved_docs)
full_input = prompt.format(context=context_text, question=question)
input_tokens = count_tokens(full_input)
answer = chain.invoke(question)
output_tokens = count_tokens(answer)
input_cost = (input_tokens / 1000000) * LLM_INPUT_COST_PER_1M
output_cost = (output_tokens / 1000000) * LLM_OUTPUT_COST_PER_1M
total_cost = input_cost + output_cost
# Embedding cost for the question itself
q_embed_tokens = count_tokens(question)
q_embed_cost = (q_embed_tokens / 1000000) * EMBEDDING_COST_PER_1M
st.sidebar.write("\n--- Cost Breakdown ---")
st.sidebar.write(f" Question embedding : {q_embed_tokens:,} tokens → ${q_embed_cost:.6f}")
st.sidebar.write(f" LLM input : {input_tokens:,} tokens → ${input_cost:.6f}")
st.sidebar.write(f" LLM output : {output_tokens:,} tokens → ${output_cost:.6f}")
st.sidebar.write(f" Total LLM answer : ${total_cost:.6f}")
st.sidebar.write("----------------------")
return answer
Main¶
vectorstore = update_index(obsidian_path, index_folder, embeddings)
question = st.text_area(f"Question", height=200)
if st.button('Ask', type='primary', width="stretch"):
response = ask(question, vectorstore, llm)
st.write(response)