Grafana NPE =========== The CSV file exported from Grafana contains a column named "Line". Each row in this column is a JSON object that includes a field called "body". For each row, extract the value of the "body" field and save it to a separate file named sequentially as `001.log`, `002.log`, and so on. All these files should be written into a directory whose name is provided as a command-line argument. Locate the line in the output that contains “java.lang.NullPointerException,” then extract that line and the next five lines. :: import csv import json import sys import os import hashlib import json import re from pathlib import Path from collections import defaultdict if len(sys.argv) != 3: print("Usage: grafana-npe ") sys.exit(1) output_dir = Path(sys.argv[2]) csv_path = Path(output_dir, sys.argv[1]) Separators in file :: START_MARKER = "### NullPointerException" END_MARKER = "### Body" Extract exceptions into output_dir :: def extract_exceptions(): if not csv_path.exists(): print(f"CSV file not found: {csv_path}") sys.exit(1) output_dir.mkdir(parents=True, exist_ok=True) count = 0 with csv_path.open(newline="", encoding="utf-8") as f: reader = csv.DictReader(f) if "Line" not in reader.fieldnames: print("CSV does not contain a 'Line' column") sys.exit(1) for idx, row in enumerate(reader, start=1): result = "" raw_date = row.get("Date") result += raw_date + "\n\n" raw_json = row.get("Line", "").strip() if not raw_json: continue try: data = json.loads(raw_json) except json.JSONDecodeError as e: print(f"Skipping row {idx}: invalid JSON ({e})") continue body = data.get("body") if body is not None: result += body + "\n\n" result += f"### Stacktrace\n\n" attributes = data.get("attributes") or {} stacktrace = attributes.get("exception.stacktrace") if stacktrace: result += stacktrace + "\n\n" # Find in result the line containing "java.lang.NullPointerException" and extract this line and following 5 lines lines = result.splitlines() for i, line in enumerate(lines): if "java.lang.NullPointerException" in line: extracted = "\n".join(lines[i:i + 6]) result = "### NullPointerException\n\n" + extracted + "\n\n### Body\n\n" + result break filename = output_dir / f"{count + 1:03d}.log" with filename.open("w", encoding="utf-8") as out: out.write(result) count += 1 print(f"Extracted {count} files into '{output_dir}'") Normalize stack trace text so equivalent stacks match more often. - strip trailing whitespace - collapse multiple blank lines - remove variable CGLIB suffixes like ``$$FastClassBySpringCGLIB$$777f0e43`` (keeps the stable prefix) :: def normalize_stack(stack_text: str) -> str: lines = [ln.rstrip() for ln in stack_text.splitlines()] # Normalize dynamic Spring CGLIB class suffixes # Example: ApiOrderControllerV2$$FastClassBySpringCGLIB$$777f0e43 -> ApiOrderControllerV2$$FastClassBySpringCGLIB$$ cg_pattern = re.compile(r"(\$\$FastClassBySpringCGLIB\$\$)[0-9a-fA-F]+") lines = [cg_pattern.sub(r"\1", ln) for ln in lines] # Collapse multiple blank lines out = [] blank = False for ln in lines: if ln.strip() == "": if not blank: out.append("") blank = True else: out.append(ln) blank = False return "\n".join(out).strip() Extracts the NPE stack section: from '### NullPointerException' to just before '### Body' Returns None if markers not found. :: def extract_stack(text: str) -> str | None: start = text.find(START_MARKER) if start == -1: return None end = text.find(END_MARKER, start) if end == -1: # If no END_MARKER, take everything from start stack = text[start:] else: stack = text[start:end] return stack.strip() Create a stable short key for grouping and display. :: def stable_key(normalized_stack: str) -> str: h = hashlib.sha1(normalized_stack.encode("utf-8")).hexdigest() return h[:12] Group .log files by NPE stack trace. :: def group_exceptions(): folder = output_dir pattern = "*.log" # Glob pattern for log files report = None # Optional path to write JSON report (groups). if not folder.exists() or not folder.is_dir(): raise SystemExit(f"Folder not found or not a directory: {folder}") files = sorted(folder.glob(pattern)) if not files: raise SystemExit(f"No files matching '{pattern}' in {folder}") groups = defaultdict(list) # key -> [filenames] stack_by_key = {} # key -> normalized stack missing_marker = [] # files without recognizable stack for p in files: try: text = p.read_text(encoding="utf-8", errors="replace") except Exception as e: print(f"Skipping {p.name}: read error: {e}") continue stack = extract_stack(text) if stack is None: missing_marker.append(p.name) continue norm = normalize_stack(stack) key = stable_key(norm) groups[key].append(p.name) stack_by_key.setdefault(key, norm) # Sort groups by size descending sorted_groups = sorted(groups.items(), key=lambda kv: len(kv[1]), reverse=True) print(f"Scanned folder: {folder}") print(f"Log files found: {len(files)}") print(f"Grouped by stack: {len(sorted_groups)}") if missing_marker: print(f"Files missing '{START_MARKER}' marker: {len(missing_marker)}") print("\n=== Groups (largest first) ===") for i, (key, names) in enumerate(sorted_groups, start=1): print(f"\n[{i}] group={key} count={len(names)}") # Print a short "signature" line (top stack frame if present) norm_stack = stack_by_key[key] sig = next((ln.strip() for ln in norm_stack.splitlines() if ln.strip().startswith("at ")), None) if sig: print(f" signature: {sig}") print(f" files: {', '.join(names[:12])}" + (f" ... (+{len(names)-12} more)" if len(names) > 12 else "")) if report: report = { "folder": str(folder), "total_files": len(files), "missing_marker_files": missing_marker, "groups": [ { "key": key, "count": len(names), "files": names, "normalized_stack": stack_by_key[key], } for key, names in sorted_groups ], } Path(report).write_text(json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8") print(f"\nWrote report: {report}") main ---- :: if __name__ == "__main__": extract_exceptions() group_exceptions()