Grafana NPE

The CSV file exported from Grafana contains a column named “Line”. Each row in this column is a JSON object that includes a field called “body”.

For each row, extract the value of the “body” field and save it to a separate file named sequentially as 001.log, 002.log, and so on. All these files should be written into a directory whose name is provided as a command-line argument.

Locate the line in the output that contains “java.lang.NullPointerException,” then extract that line and the next five lines.

import csv
import json
import sys
import os
import hashlib
import json
import re
from pathlib import Path
from collections import defaultdict

if len(sys.argv) != 3:
    print("Usage: grafana-npe <input.csv> <output_folder>")
    sys.exit(1)

output_dir = Path(sys.argv[2])
csv_path = Path(output_dir, sys.argv[1])

Separators in file

START_MARKER = "### NullPointerException"
END_MARKER = "### Body"

Extract exceptions into output_dir

def extract_exceptions():
    if not csv_path.exists():
        print(f"CSV file not found: {csv_path}")
        sys.exit(1)

    output_dir.mkdir(parents=True, exist_ok=True)

    count = 0

    with csv_path.open(newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)

        if "Line" not in reader.fieldnames:
            print("CSV does not contain a 'Line' column")
            sys.exit(1)

        for idx, row in enumerate(reader, start=1):
            result = ""

            raw_date = row.get("Date")
            result += raw_date + "\n\n"

            raw_json = row.get("Line", "").strip()
            if not raw_json:
                continue

            try:
                data = json.loads(raw_json)
            except json.JSONDecodeError as e:
                print(f"Skipping row {idx}: invalid JSON ({e})")
                continue

            body = data.get("body")
            if body is not None:
                result += body + "\n\n"

            result += f"### Stacktrace\n\n"
            attributes = data.get("attributes") or {}
            stacktrace = attributes.get("exception.stacktrace")
            if stacktrace:
                result += stacktrace + "\n\n"

            # Find in result the line containing "java.lang.NullPointerException" and extract this line and following 5 lines
            lines = result.splitlines()
            for i, line in enumerate(lines):
                if "java.lang.NullPointerException" in line:
                    extracted = "\n".join(lines[i:i + 6])
                    result = "### NullPointerException\n\n" + extracted + "\n\n### Body\n\n" + result
                    break


            filename = output_dir / f"{count + 1:03d}.log"
            with filename.open("w", encoding="utf-8") as out:
                out.write(result)

            count += 1

    print(f"Extracted {count} files into '{output_dir}'")

Normalize stack trace text so equivalent stacks match more often.

  • strip trailing whitespace

  • collapse multiple blank lines

  • remove variable CGLIB suffixes like $$FastClassBySpringCGLIB$$777f0e43 (keeps the stable prefix)

def normalize_stack(stack_text: str) -> str:
    lines = [ln.rstrip() for ln in stack_text.splitlines()]

    # Normalize dynamic Spring CGLIB class suffixes
    # Example: ApiOrderControllerV2$$FastClassBySpringCGLIB$$777f0e43 -> ApiOrderControllerV2$$FastClassBySpringCGLIB$$<id>
    cg_pattern = re.compile(r"(\$\$FastClassBySpringCGLIB\$\$)[0-9a-fA-F]+")
    lines = [cg_pattern.sub(r"\1<id>", ln) for ln in lines]

    # Collapse multiple blank lines
    out = []
    blank = False
    for ln in lines:
        if ln.strip() == "":
            if not blank:
                out.append("")
            blank = True
        else:
            out.append(ln)
            blank = False

    return "\n".join(out).strip()

Extracts the NPE stack section: from ‘### NullPointerException’ to just before ‘### Body’ Returns None if markers not found.

def extract_stack(text: str) -> str | None:
    start = text.find(START_MARKER)
    if start == -1:
        return None

    end = text.find(END_MARKER, start)
    if end == -1:
        # If no END_MARKER, take everything from start
        stack = text[start:]
    else:
        stack = text[start:end]

    return stack.strip()

Create a stable short key for grouping and display.

def stable_key(normalized_stack: str) -> str:
    h = hashlib.sha1(normalized_stack.encode("utf-8")).hexdigest()
    return h[:12]

Group .log files by NPE stack trace.

def group_exceptions():

    folder = output_dir
    pattern = "*.log" # Glob pattern for log files
    report = None # Optional path to write JSON report (groups).

    if not folder.exists() or not folder.is_dir():
        raise SystemExit(f"Folder not found or not a directory: {folder}")

    files = sorted(folder.glob(pattern))
    if not files:
        raise SystemExit(f"No files matching '{pattern}' in {folder}")

    groups = defaultdict(list)         # key -> [filenames]
    stack_by_key = {}                  # key -> normalized stack
    missing_marker = []                # files without recognizable stack

    for p in files:
        try:
            text = p.read_text(encoding="utf-8", errors="replace")
        except Exception as e:
            print(f"Skipping {p.name}: read error: {e}")
            continue

        stack = extract_stack(text)
        if stack is None:
            missing_marker.append(p.name)
            continue

        norm = normalize_stack(stack)
        key = stable_key(norm)

        groups[key].append(p.name)
        stack_by_key.setdefault(key, norm)

    # Sort groups by size descending
    sorted_groups = sorted(groups.items(), key=lambda kv: len(kv[1]), reverse=True)

    print(f"Scanned folder: {folder}")
    print(f"Log files found: {len(files)}")
    print(f"Grouped by stack: {len(sorted_groups)}")
    if missing_marker:
        print(f"Files missing '{START_MARKER}' marker: {len(missing_marker)}")

    print("\n=== Groups (largest first) ===")
    for i, (key, names) in enumerate(sorted_groups, start=1):
        print(f"\n[{i}] group={key}  count={len(names)}")
        # Print a short "signature" line (top stack frame if present)
        norm_stack = stack_by_key[key]
        sig = next((ln.strip() for ln in norm_stack.splitlines() if ln.strip().startswith("at ")), None)
        if sig:
            print(f"    signature: {sig}")
        print(f"    files: {', '.join(names[:12])}" + (f" ... (+{len(names)-12} more)" if len(names) > 12 else ""))

    if report:
        report = {
            "folder": str(folder),
            "total_files": len(files),
            "missing_marker_files": missing_marker,
            "groups": [
                {
                    "key": key,
                    "count": len(names),
                    "files": names,
                    "normalized_stack": stack_by_key[key],
                }
                for key, names in sorted_groups
            ],
        }
        Path(report).write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8")
        print(f"\nWrote report: {report}")

main

if __name__ == "__main__":
    extract_exceptions()
    group_exceptions()