Obsidian Chat (LC)

Obsidian Chat using LangChain

import streamlit as st
import os
import json
import hashlib
import tiktoken
from pathlib import Path

Import LangChain

placeholder = st.sidebar.empty()
with placeholder.status("Importing langchain...", expanded=True) as status:
    st.write("Importing langchain_openai...")
    from langchain_openai import OpenAIEmbeddings, ChatOpenAI

    st.write("Importing langchain_chroma...")
    from langchain_chroma import Chroma

    st.write("Importing langchain_core.documents...")
    from langchain_core.documents import Document

    st.write("Importing langchain_text_splitters...")
    from langchain_text_splitters import RecursiveCharacterTextSplitter

    st.write("Importing langchain_core.prompts...")
    from langchain_core.prompts import ChatPromptTemplate

    st.write("Importing langchain_core.runnables...")
    from langchain_core.runnables import RunnablePassthrough

    st.write("Importing langchain_core.output_parsers...")
    from langchain_core.output_parsers import StrOutputParser

    st.write("Importing langchain_community.document_loaders...")
    from langchain_community.document_loaders import TextLoader

    # status.update(label="Obsidian Chat", state="complete", expanded=False)

placeholder.empty()

Print banner.

st.set_page_config(
    page_title="O-Chat"
)

@st.cache_data
def print_banner():
    print("""
     _____        _____ _           _
    |  _  |      /  __ \\ |         | |
    | | | |______| /  \\/ |__   __ _| |_
    | | | |______| |   | '_ \\ / _` | __|
    \\ \\_/ /      | \\__/\\ | | | (_| | |_
     \\___/        \\____/_| |_|\\__,_|\\__|
    """)
    return 1

print_banner()

Embeddings and LLM costs

embedding_models = {
    "text-embedding-3-small": 0.02,
}

EMBEDDING_MODEL = list(embedding_models.keys())[0]
EMBEDDING_COST_PER_1M = embedding_models[EMBEDDING_MODEL]

index_folder = f"vectors/o-{EMBEDDING_MODEL}"

llm_models = {
    "gpt-5.4": (2.50, 15.00),
    "gpt-5.4-mini": (0.75, 4.50),
    "gpt-5.4-nano": (0.20, 1.25),
    "gpt-4o-mini": (0.15, 0.60),
}

LLM_MODEL = list(llm_models.keys())[3]
LLM_INPUT_COST_PER_1M, LLM_OUTPUT_COST_PER_1M = llm_models[LLM_MODEL]

embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
llm = ChatOpenAI(model=LLM_MODEL, temperature=0)

Find Obsidian folder, which is the first subfolder within the current folder that has a name ending with “ Book”.

current_folder = os.getcwd()
home_folders = os.listdir(current_folder)
book_folders = [item for item in home_folders if os.path.isdir(os.path.join(current_folder, item)) and item.endswith(" Book")]

if (len(book_folders)==0):
    st.error('The folder should contain a subfolder with a name that ends with " Book".')
    st.stop()

obsidian_path =  book_folders[0]
st.title(obsidian_path)

Helpers

count_tokens(text: str, model: str = 'cl100k_base') int
def count_tokens(text: str, model: str = "cl100k_base") -> int:
    """Return the number of tokens for the given text."""
    enc = tiktoken.get_encoding(model)
    return len(enc.encode(text))
file_md5(path: str) str
def file_md5(path: str) -> str:
    """Return MD5 hex-digest for a file."""
    h = hashlib.md5()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()
load_hashes(index_folder: str) dict
HASH_FILE = ".file_hashes.json"

def load_hashes(index_folder: str) -> dict:
    """Load the previously stored file-hash map."""
    hash_path = os.path.join(index_folder, HASH_FILE)
    if os.path.exists(hash_path):
        with open(hash_path, "r") as f:
            return json.load(f)
    return {}
save_hashes(index_folder: str, hashes: dict)
def save_hashes(index_folder: str, hashes: dict):
    """Persist the file-hash map."""
    os.makedirs(index_folder, exist_ok=True)
    hash_path = os.path.join(index_folder, HASH_FILE)
    with open(hash_path, "w") as f:
        json.dump(hashes, f, indent=2)
detect_changes(obsidian_path: str, index_folder: str)
def detect_changes(obsidian_path: str, index_folder: str):
    """Compare current markdown files against stored hashes.

    Returns:
        changed_files: list of file paths that are new or modified
        deleted_files: list of file paths that no longer exist
        current_hashes: full hash map for the current state
    """
    old_hashes = load_hashes(index_folder)
    current_hashes: dict[str, str] = {}

    md_files = list(Path(obsidian_path).rglob("*.md"))
    for p in md_files:
        fp = str(p)
        current_hashes[fp] = file_md5(fp)

    changed_files = [
        fp for fp, h in current_hashes.items()
        if old_hashes.get(fp) != h
    ]
    deleted_files = [
        fp for fp in old_hashes
        if fp not in current_hashes
    ]
    return changed_files, deleted_files, current_hashes

Core pipeline

load_and_split(file_paths: list[str]) list
CHUNK_SIZE = 1000
CHUNK_OVERLAP = 200

def load_and_split(file_paths: list[str]) -> list:
    """Load markdown files and split into chunks."""

    documents: list[Document] = []
    for fp in file_paths:
        try:
            loader = TextLoader(fp, encoding="utf-8")
            documents.extend(loader.load())
        except Exception as e:
            st.warning(f"  ⚠ Skipping {fp}: {e}")

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=CHUNK_SIZE,
        chunk_overlap=CHUNK_OVERLAP,
    )
    return splitter.split_documents(documents)
update_index(obsidian_path: str, index_folder: str, embeddings) Chroma
def update_index(obsidian_path: str, index_folder: str, embeddings) -> Chroma:
    """Create or incrementally update the Chroma vector store."""
    changed, deleted, current_hashes = detect_changes(obsidian_path, index_folder)

    # Open (or create) the persistent store
    vectorstore = Chroma(
        persist_directory=index_folder,
        embedding_function=embeddings,
        collection_name="obsidian_docs",
    )

    if not changed and not deleted:
        st.sidebar.write("✓ Index is up-to-date — no files changed.")
        return vectorstore

    # --- handle deletions ---------------------------------------------------
    if deleted:
        st.sidebar.write(f"  Removing {len(deleted)} deleted file(s) from index …")
        existing = vectorstore.get()
        ids_to_delete = [
            doc_id
            for doc_id, meta in zip(existing["ids"], existing["metadatas"])
            if meta.get("source") in deleted
        ]
        if ids_to_delete:
            vectorstore.delete(ids=ids_to_delete)

    # --- handle new / modified files ----------------------------------------
    if changed:
        st.sidebar.write(f"  Indexing {len(changed)} new/modified file(s) …")

        # Remove old chunks for modified files first
        existing = vectorstore.get()
        ids_to_delete = [
            doc_id
            for doc_id, meta in zip(existing["ids"], existing["metadatas"])
            if meta.get("source") in changed
        ]
        if ids_to_delete:
            vectorstore.delete(ids=ids_to_delete)

        chunks = load_and_split(changed)
        if chunks:
            # --- Embedding cost estimation ----------------------------------
            total_embed_tokens = sum(count_tokens(c.page_content) for c in chunks)
            embed_cost = (total_embed_tokens / 1000000) * EMBEDDING_COST_PER_1M
            st.sidebar.write(f"  Embedding tokens: {total_embed_tokens:,}  "
                     f"(est. cost ${embed_cost*100:.6f} cents)")

            vectorstore.add_documents(chunks)

    save_hashes(index_folder, current_hashes)
    st.sidebar.write("✓ Index updated.")
    return vectorstore
ask(question: str, vectorstore: Chroma, llm) str
def ask(question: str, vectorstore: Chroma, llm) -> str:
    """Run a RAG query and print cost estimates."""
    retriever = vectorstore.as_retriever(search_kwargs={"k": 5})

    template = """Answer the question based only on the following context.
If you cannot answer from the context, say so.

Context:
{context}

Question: {question}
"""
    prompt = ChatPromptTemplate.from_template(template)

    def format_docs(docs):
        return "\n\n---\n\n".join(d.page_content for d in docs)

    chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    # Retrieve docs to estimate input tokens
    retrieved_docs = retriever.invoke(question)
    context_text = format_docs(retrieved_docs)
    full_input = prompt.format(context=context_text, question=question)
    input_tokens = count_tokens(full_input)

    answer = chain.invoke(question)

    output_tokens = count_tokens(answer)

    input_cost = (input_tokens / 1000000) * LLM_INPUT_COST_PER_1M
    output_cost = (output_tokens / 1000000) * LLM_OUTPUT_COST_PER_1M
    total_cost = input_cost + output_cost

    # Embedding cost for the question itself
    q_embed_tokens = count_tokens(question)
    q_embed_cost = (q_embed_tokens / 1000000) * EMBEDDING_COST_PER_1M

    st.sidebar.write("\n--- Cost Breakdown ---")
    st.sidebar.write(f"  Question embedding : {q_embed_tokens:,} tokens  → ${q_embed_cost:.6f}")
    st.sidebar.write(f"  LLM input          : {input_tokens:,} tokens  → ${input_cost:.6f}")
    st.sidebar.write(f"  LLM output         : {output_tokens:,} tokens  → ${output_cost:.6f}")
    st.sidebar.write(f"  Total LLM answer   : ${total_cost:.6f}")
    st.sidebar.write("----------------------")

    return answer

Main

vectorstore = update_index(obsidian_path, index_folder, embeddings)

question = st.text_area(f"Question", height=200)

if st.button('Ask', type='primary', width="stretch"):
    response = ask(question, vectorstore, llm)
    st.write(response)