#!/usr/bin/env python3
# Timestamp: "2026-02-09 (ywatanabe)"
# File: /home/ywatanabe/proj/scitex-python/src/scitex/verify/_claim.py
"""Claim layer — link paper assertions to verification chain.
Claims represent specific assertions in manuscripts (statistics, figures,
tables) that can be traced back through the verification chain to source data.
Five claim types:
- statistic: A numerical result (p-value, effect size, etc.)
- figure: A figure reference linked to a recipe/image
- table: A table reference linked to source CSV
- text: A textual assertion linked to computational output
- value: A specific computed value (count, percentage, etc.)
"""
from __future__ import annotations
import json
import os
import re
import sqlite3
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Union
from ._db import get_db
# Canonical claim types
CLAIM_TYPES = ("statistic", "figure", "table", "text", "value")
@dataclass
class Claim:
"""A traceable assertion in a manuscript."""
claim_id: str
file_path: str
line_number: Optional[int]
claim_type: str
claim_value: Optional[str]
source_session: Optional[str]
source_file: Optional[str]
source_hash: Optional[str]
registered_at: Optional[str] = None
verified_at: Optional[str] = None
status: str = "registered"
@property
def location(self) -> str:
"""Human-readable location string."""
if self.line_number:
return f"{self.file_path}:L{self.line_number}"
return self.file_path
def to_dict(self) -> Dict:
return {
"claim_id": self.claim_id,
"file_path": self.file_path,
"line_number": self.line_number,
"claim_type": self.claim_type,
"claim_value": self.claim_value,
"source_session": self.source_session,
"source_file": self.source_file,
"source_hash": self.source_hash,
"registered_at": self.registered_at,
"verified_at": self.verified_at,
"status": self.status,
}
def migrate_add_claims_table(db_path: Path) -> None:
"""Create claims table if not present. Safe to call multiple times."""
conn = sqlite3.connect(str(db_path))
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS claims (
id INTEGER PRIMARY KEY AUTOINCREMENT,
claim_id TEXT UNIQUE NOT NULL,
file_path TEXT NOT NULL,
line_number INTEGER,
claim_type TEXT NOT NULL,
claim_value TEXT,
source_session TEXT,
source_file TEXT,
source_hash TEXT,
registered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
verified_at TIMESTAMP,
status TEXT DEFAULT 'registered'
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_claims_file ON claims(file_path)")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_claims_source ON claims(source_file)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_claims_session ON claims(source_session)"
)
conn.commit()
finally:
conn.close()
def _generate_claim_id(
file_path: str, line_number: Optional[int], claim_type: str
) -> str:
"""Generate a deterministic claim ID."""
loc = f"{file_path}:L{line_number}" if line_number else file_path
import hashlib
h = hashlib.sha256(f"{loc}:{claim_type}".encode()).hexdigest()[:12]
return f"claim_{h}"
[docs]
def add_claim(
file_path: str,
claim_type: str,
line_number: Optional[int] = None,
claim_value: Optional[str] = None,
source_file: Optional[str] = None,
source_session: Optional[str] = None,
) -> Claim:
"""Register a claim linking a manuscript assertion to the verification chain.
Parameters
----------
file_path : str
Path to the manuscript file (e.g., paper.tex).
claim_type : str
One of: statistic, figure, table, text, value.
line_number : int, optional
Line number in the manuscript.
claim_value : str, optional
The asserted value (e.g., "p = 0.003").
source_file : str, optional
Path to the source file that produced this claim.
source_session : str, optional
Session ID that produced the source.
Returns
-------
Claim
The registered claim object.
"""
if claim_type not in CLAIM_TYPES:
raise ValueError(
f"Invalid claim_type '{claim_type}'. Must be one of: {CLAIM_TYPES}"
)
file_path = str(Path(file_path).resolve())
claim_id = _generate_claim_id(file_path, line_number, claim_type)
# Compute source hash if source_file exists
source_hash = None
if source_file:
source_file = str(Path(source_file).resolve())
source_path = Path(source_file)
if source_path.exists():
from ._hash import hash_file
source_hash = hash_file(source_path)
# Auto-detect source session if not provided
if source_file and not source_session:
db = get_db()
sessions = db.find_session_by_file(source_file, role="output")
if sessions:
source_session = sessions[0]
claim = Claim(
claim_id=claim_id,
file_path=file_path,
line_number=line_number,
claim_type=claim_type,
claim_value=claim_value,
source_session=source_session,
source_file=source_file,
source_hash=source_hash,
)
# Store in database
db = get_db()
_ensure_claims_table(db)
conn = sqlite3.connect(str(db.db_path))
try:
conn.execute(
"""
INSERT OR REPLACE INTO claims
(claim_id, file_path, line_number, claim_type, claim_value,
source_session, source_file, source_hash, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'registered')
""",
(
claim.claim_id,
claim.file_path,
claim.line_number,
claim.claim_type,
claim.claim_value,
claim.source_session,
claim.source_file,
claim.source_hash,
),
)
conn.commit()
finally:
conn.close()
# Auto-export the canonical claims.json so consumers (verifier,
# scitex-writer, human eyes) can read a stable artifact without
# talking to sqlite. Default ON; opt out with
# SCITEX_CLEW_AUTO_EXPORT_CLAIMS=0 if you're streaming thousands of
# claims and the per-call rewrite cost matters. The cost is O(N×K)
# where N is total claims in the DB and K is rewrite size — for
# typical research papers (N < 100, K < 50 KB) it's negligible.
if os.environ.get("SCITEX_CLEW_AUTO_EXPORT_CLAIMS", "1") != "0":
try:
export_claims_json()
except Exception as exc: # noqa: BLE001
# Auto-export is a convenience layer — must not break the
# add_claim primary path if e.g. the runtime/ dir is
# read-only on this host. Log and continue. The user can
# call export_claims_json() explicitly to surface failures.
import warnings as _w
_w.warn(
f"scitex_clew auto-export of claims.json failed "
f"(set SCITEX_CLEW_AUTO_EXPORT_CLAIMS=0 to silence): "
f"{exc!r}",
RuntimeWarning,
stacklevel=2,
)
return claim
[docs]
def export_claims_json(
path: Optional[Union[str, Path]] = None,
*,
file_path_filter: Optional[str] = None,
read_only: bool = True,
) -> Path:
"""Export every registered claim to a canonical JSON artifact.
The exported file is the single human-readable + machine-consumable
view of the claims table in ``db.sqlite``. The DB remains the
source of truth; this JSON is a regenerable artifact.
Path resolution (mirrors :func:`scitex_clew._db._core._default_db_path`)::
1. Explicit ``path`` argument.
2. ``$SCITEX_CLEW_CLAIMS_JSON`` env var (escape hatch).
3. ``<project_root>/.scitex/clew/runtime/claims.json``
(project root = nearest ancestor dir with ``.git`` or
``pyproject.toml``; falls back to cwd if none found).
Parameters
----------
path : str | Path, optional
Override the resolved path. Useful for tests / one-off dumps.
file_path_filter : str, optional
When set, only claims registered against this manuscript file
path are exported. Default: every claim in the DB.
read_only : bool, optional
After writing, ``chmod 0o444`` the file so accidental edits
fail loudly at the OS layer. Default True (the file IS
derived). Set False for tests that need to mutate the file.
Returns
-------
Path
The path the artifact was written to (absolute).
Examples
--------
>>> import scitex_clew as clew
>>> clew.add_claim("paper.tex", "value", 42, "0.94", source_file="r.csv")
>>> # claims.json now auto-exported under ./.scitex/clew/runtime/
>>> clew.export_claims_json() # idempotent — re-emit on demand
PosixPath('.../.scitex/clew/runtime/claims.json')
"""
from ._db import _core as _db_core
if path is None:
env_path = os.environ.get("SCITEX_CLEW_CLAIMS_JSON")
if env_path:
path = Path(env_path)
else:
path = _db_core._default_claims_json_path(
_db_core._find_project_root()
)
path = Path(path).resolve()
path.parent.mkdir(parents=True, exist_ok=True)
claims = list_claims(file_path=file_path_filter, limit=10_000)
payload = {
"_note": (
"AUTO-GENERATED by scitex_clew.export_claims_json() from "
"db.sqlite. Do NOT edit by hand — re-emit by calling "
"scitex_clew.export_claims_json() (default-on after every "
"clew.add_claim()) or by re-running your pipeline."
),
"claims_count": len(claims),
"claims": [c.to_dict() for c in claims],
}
# Clear any pre-existing read-only bit before rewriting.
if path.exists():
try:
path.chmod(0o644)
except OSError:
pass
path.write_text(json.dumps(payload, indent=2, default=str))
if read_only:
try:
path.chmod(0o444)
except OSError:
# Best-effort — on filesystems that don't support unix
# perms (e.g. some Windows mounts) this is a no-op.
pass
return path
[docs]
def list_claims(
file_path: Optional[str] = None,
claim_type: Optional[str] = None,
status: Optional[str] = None,
limit: int = 100,
) -> List[Claim]:
"""List registered claims with optional filters.
Parameters
----------
file_path : str, optional
Filter by manuscript file path.
claim_type : str, optional
Filter by claim type.
status : str, optional
Filter by verification status.
limit : int
Maximum number of claims to return.
Returns
-------
list of Claim
"""
db = get_db()
_ensure_claims_table(db)
query = "SELECT * FROM claims WHERE 1=1"
params = []
if file_path:
file_path = str(Path(file_path).resolve())
query += " AND file_path = ?"
params.append(file_path)
if claim_type:
query += " AND claim_type = ?"
params.append(claim_type)
if status:
query += " AND status = ?"
params.append(status)
query += " ORDER BY file_path, line_number LIMIT ?"
params.append(limit)
conn = sqlite3.connect(str(db.db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(query, params).fetchall()
return [
Claim(
claim_id=row["claim_id"],
file_path=row["file_path"],
line_number=row["line_number"],
claim_type=row["claim_type"],
claim_value=row["claim_value"],
source_session=row["source_session"],
source_file=row["source_file"],
source_hash=row["source_hash"],
registered_at=row["registered_at"],
verified_at=row["verified_at"],
status=row["status"],
)
for row in rows
]
finally:
conn.close()
[docs]
def verify_claim(claim_id_or_location: str) -> Dict:
"""Verify a specific claim by checking its source against the verification chain.
Parameters
----------
claim_id_or_location : str
Either a claim_id or a location string like "paper.tex:L42".
Returns
-------
dict
Verification result with claim details and chain status.
"""
db = get_db()
_ensure_claims_table(db)
claim = _resolve_claim(claim_id_or_location, db)
if not claim:
return {
"status": "not_found",
"message": f"No claim found for '{claim_id_or_location}'",
}
result = {
"claim": claim.to_dict(),
"source_verified": False,
"chain_verified": False,
"details": [],
}
# Check source file exists and hash matches
if claim.source_file:
source_path = Path(claim.source_file)
if not source_path.exists():
result["details"].append(f"Source file missing: {claim.source_file}")
_update_claim_status(claim.claim_id, "missing", db)
result["claim"]["status"] = "missing"
return result
from ._hash import hash_file
current_hash = hash_file(source_path)
if (
claim.source_hash
and current_hash[: len(claim.source_hash)]
== claim.source_hash[: len(current_hash)]
):
result["source_verified"] = True
result["details"].append("Source file hash matches")
else:
result["details"].append(
f"Source hash mismatch: stored={claim.source_hash}, current={current_hash}"
)
_update_claim_status(claim.claim_id, "mismatch", db)
result["claim"]["status"] = "mismatch"
return result
# Verify the chain if we have a source file
if claim.source_file:
from ._chain import verify_chain
try:
chain = verify_chain(claim.source_file)
result["chain_verified"] = chain.is_verified
if chain.is_verified:
result["details"].append(f"Chain verified ({len(chain.runs)} runs)")
else:
result["details"].append(
f"Chain verification failed ({len(chain.failed_runs)} failed runs)"
)
except Exception as e:
result["details"].append(f"Chain verification error: {e}")
# Update status
if result["source_verified"] and result["chain_verified"]:
_update_claim_status(claim.claim_id, "verified", db)
result["claim"]["status"] = "verified"
elif result["source_verified"]:
_update_claim_status(claim.claim_id, "partial", db)
result["claim"]["status"] = "partial"
return result
def verify_claims_dag(
file_path: Optional[str] = None,
claim_type: Optional[str] = None,
) -> DAGVerification:
"""Build a unified DAG from all claims, tracing each back to its source.
Parameters
----------
file_path : str, optional
Filter claims by manuscript file path.
claim_type : str, optional
Filter claims by type.
Returns
-------
DAGVerification
Unified verification result covering all claim source chains merged.
"""
from ._chain import DAGVerification, VerificationStatus
from ._dag import verify_dag
claims = list_claims(file_path=file_path, claim_type=claim_type)
# Collect unique source files from claims
source_files = []
for c in claims:
if c.source_file and c.source_file not in source_files:
source_files.append(c.source_file)
if not source_files:
return DAGVerification(
target_files=[],
runs=[],
edges=[],
status=VerificationStatus.UNKNOWN,
topological_order=[],
)
return verify_dag(source_files)
def _resolve_claim(identifier: str, db) -> Optional[Claim]:
"""Resolve a claim by ID or location string."""
conn = sqlite3.connect(str(db.db_path))
conn.row_factory = sqlite3.Row
try:
# Try claim_id first
row = conn.execute(
"SELECT * FROM claims WHERE claim_id = ?", (identifier,)
).fetchone()
if not row:
# Try location format: file.tex:L42
match = re.match(r"^(.+):L(\d+)$", identifier)
if match:
fpath = str(Path(match.group(1)).resolve())
line = int(match.group(2))
row = conn.execute(
"SELECT * FROM claims WHERE file_path = ? AND line_number = ?",
(fpath, line),
).fetchone()
if not row:
# Try file path only (returns first match)
fpath = str(Path(identifier).resolve())
row = conn.execute(
"SELECT * FROM claims WHERE file_path = ? ORDER BY line_number LIMIT 1",
(fpath,),
).fetchone()
if row:
return Claim(
claim_id=row["claim_id"],
file_path=row["file_path"],
line_number=row["line_number"],
claim_type=row["claim_type"],
claim_value=row["claim_value"],
source_session=row["source_session"],
source_file=row["source_file"],
source_hash=row["source_hash"],
registered_at=row["registered_at"],
verified_at=row["verified_at"],
status=row["status"],
)
return None
finally:
conn.close()
def _update_claim_status(claim_id: str, status: str, db) -> None:
"""Update claim verification status."""
conn = sqlite3.connect(str(db.db_path))
try:
conn.execute(
"UPDATE claims SET status = ?, verified_at = ? WHERE claim_id = ?",
(status, datetime.now().isoformat(), claim_id),
)
conn.commit()
finally:
conn.close()
def _ensure_claims_table(db) -> None:
"""Ensure the claims table exists (run migration)."""
migrate_add_claims_table(db.db_path)
def format_claims(claims: List[Claim], verbose: bool = False) -> str:
"""Format claims list for terminal display."""
if not claims:
return "No claims registered."
lines = []
status_icons = {
"registered": "\u25cb", # ○
"verified": "\u2713", # ✓
"mismatch": "\u2717", # ✗
"missing": "?",
"partial": "~",
}
for c in claims:
icon = status_icons.get(c.status, "?")
loc = c.location
val = f" = {c.claim_value}" if c.claim_value else ""
lines.append(f" {icon} [{c.claim_type}] {loc}{val}")
if verbose and c.source_file:
src = Path(c.source_file).name
lines.append(
f" source: {src} (session: {c.source_session or 'unknown'})"
)
return "\n".join(lines)
__all__ = [
"CLAIM_TYPES",
"Claim",
"add_claim",
"list_claims",
"verify_claim",
"verify_claims_dag",
"format_claims",
"migrate_add_claims_table",
]