Obsidian Chat (LI)

Obsidian Chat using LlamaIndex

LlamaIndex Links

Name

URL

LlamaIndex Google GenAI Embeddings

https://github.com/run-llama/llama_index/blob/main/docs/docs/examples/embeddings/google_genai.ipynb

Google Embeddings

https://ai.google.dev/gemini-api/docs/embeddings

Using VectorStoreIndex - Guide

https://github.com/run-llama/llama_index/blob/main/docs/docs/module_guides/indexing/vector_store_index.md

Gemini Model variants

https://ai.google.dev/gemini-api/docs/models/gemini#model-variations

import streamlit as st
import os
import json
from pathlib import Path

from llama_index.llms.google_genai import GoogleGenAI
from llama_index.embeddings.google_genai import GoogleGenAIEmbedding
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
    StorageContext,
    load_index_from_storage
)

Prints a stylized banner to the console when the application starts.

@st.cache_data
def print_banner():
    print("""
     _____        _____ _           _
    |  _  |      /  __ \\ |         | |
    | | | |______| /  \\/ |__   __ _| |_
    | | | |______| |   | '_ \\ / _` | __|
    \\ \\_/ /      | \\__/\\ | | | (_| | |_
     \\___/        \\____/_| |_|\\__,_|\\__|
    """)
    return 1

print_banner()

Folder to save index

index_folder = "vectors/obsidian"

Select LLM

g_key = os.getenv("GEMINI_API_KEY")

llm_models = [
    "gemini-2.0-flash",
    "gemma-3-27b-it",
]

llm_model = st.sidebar.selectbox(
   "LLM Model",
   llm_models,
)

llm = GoogleGenAI(
    model=llm_model,
    api_key=g_key
)

Settings.llm = llm

Select Embeddings

embed_model_names = [
    "text-embedding-004",
    "gemini-embedding-exp-03-07",
]

embed_model_name = st.sidebar.selectbox(
   "Embedding Model",
   embed_model_names,
)

embed_model = GoogleGenAIEmbedding(
    model_name=embed_model_name,
    embed_batch_size=100,
    api_key=g_key,
)

Settings.embed_model = embed_model

Find Obsidian folder

Find Obsidian folder, which is the first subfolder within the current folder that has a name ending with “ Book”.

current_folder = os.getcwd()
home_folders = os.listdir(current_folder)
book_folders = [item for item in home_folders if os.path.isdir(os.path.join(current_folder, item)) and item.endswith(" Book")]

if (len(book_folders)==0):
    raise Exception('The folder should contain a subfolder with a name that ends with " Book".')

note_home =  book_folders[0]
st.title(note_home)

File change detection based on modified timestamps

timestamp_file = os.path.join(index_folder, ".file_timestamps.json")

def get_all_file_timestamps(folder):
    return {
        str(f): os.path.getmtime(f)
        for f in Path(folder).glob("*.md") if f.is_file()
    }

def load_previous_timestamps():
    if os.path.exists(timestamp_file):
        with open(timestamp_file, "r") as f:
            return json.load(f)
    return {}

def save_timestamps(timestamps):
    with open(timestamp_file, "w") as f:
        json.dump(timestamps, f)

def get_changed_files(current, previous):
    changed = []
    for path, mtime in current.items():
        if path not in previous or previous[path] != mtime:
            changed.append(path)
    return changed

Create Index

Creates a new index from documents and persists it.

def create_index(input_dir, persist_dir):
    reader = SimpleDirectoryReader(
        input_dir=input_dir,
        recursive=False
    )
    documents = reader.load_data()
    st.sidebar.info(f"Creating index from {len(documents)} document(s)...")
    st.session_state.index = VectorStoreIndex.from_documents(documents)
    st.session_state.index.storage_context.persist(persist_dir=persist_dir)
    # print(f"Index created and saved successfully!")

    # Save initial timestamps
    current_timestamps = get_all_file_timestamps(input_dir)
    save_timestamps(current_timestamps)

Loads an existing index and refreshes it with current documents.

def load_and_refresh_index(input_dir, persist_dir):
    # print("Loading existing index...")
    try:
        storage_context = StorageContext.from_defaults(persist_dir=persist_dir)

        st.session_state.index = load_index_from_storage(storage_context)
        # print("Index loaded successfully!")

        # st.info("Checking for document updates...")
        reader = SimpleDirectoryReader(
            input_dir=input_dir,
            recursive=False # Match the setting used during creation
        )
        current_documents = reader.load_data()

        current_timestamps = get_all_file_timestamps(note_home)
        previous_timestamps = load_previous_timestamps()
        changed_files = get_changed_files(current_timestamps, previous_timestamps)

        if changed_files:
            changed_docs = SimpleDirectoryReader(input_files=changed_files).load_data()
            st.session_state.index.refresh_ref_docs(changed_docs) # Pass the newly loaded docs
            st.sidebar.success(f"Refreshed {len(changed_docs)} document(s).")
            st.session_state.index.storage_context.persist(persist_dir=persist_dir)
            save_timestamps(current_timestamps)
            # print("Updated index saved.")
        else:
            st.sidebar.info("No documents changed. Index not updated.")

    except Exception as e:
        st.sidebar.error(f"Error loading or refreshing index: {e}")

Update or create index

if os.path.exists(index_folder):
    if "index" not in st.session_state:
        load_and_refresh_index(note_home, index_folder)

else:
    if st.sidebar.button('Create Index', type='primary', use_container_width=True):
        # Create the index from scratch if it doesn't exist
        #st.sidebar.info("Index not found. Creating a new one...")
        create_index(note_home, index_folder)
    else:
        st.stop()

Query

if "query_engine" not in st.session_state:
    st.session_state.query_engine = st.session_state.index.as_query_engine()

question = st.text_area(f"Question", height=200)

if st.button('Ask', type='primary', use_container_width=True):
    response = st.session_state.query_engine.query(question)
    st.write(response.response)

if st.sidebar.button('Update Index', use_container_width=True):
    del st.session_state['index']
    load_and_refresh_index(note_home, index_folder)
# print("-------------")