Files
VIBECODE-THEORY/tools/integrator/integrate.py
T
Mortdecai f654b30de9 docs: integration tools — cross-reference graph, concept index, research digest
Codex-built tooling: cross-reference graph, concept index with build script,
and research integrator that extracted 142 scholars, 175 bibliography items,
4 contradiction topics, and coverage maps for Paper 009 planning.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 08:31:20 -04:00

912 lines
29 KiB
Python

#!/usr/bin/env python3
"""Integrate research markdown files into a unified digest for Paper 009 planning."""
from __future__ import annotations
import argparse
import json
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
HEADING_RE = re.compile(r"^(#{1,6})\s+(.+?)\s*$", re.MULTILINE)
BULLET_RE = re.compile(r"^\s*[-*]\s+(.+?)\s*$")
NUMBERED_RE = re.compile(r"^\s*(\d+)\.\s+(.+?)\s*$")
SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+|\n+")
NAME_RE = re.compile(
r"\b(?:[A-Z](?:\.[A-Z])+\.?|[A-Z][a-zA-Z'-]+)"
r"(?:\s+(?:[A-Z](?:\.[A-Z])+\.?|[A-Z][a-zA-Z'-]+)){1,3}\b"
)
BAD_NAME_WORDS = {
"executive",
"summary",
"task",
"sources",
"paper",
"physics",
"technology",
"society",
"logs",
"pricing",
"history",
"quantum",
"analysis",
}
TOPIC_RULES = [
{
"id": "determinism_vs_agency",
"label": "Technological determinism vs social agency",
"pro_markers": [
"autonomous technique",
"irreversible",
"lock-in",
"path dependence",
"ratchet",
"structurally fixed",
"cannot reverse",
],
"con_markers": [
"social construct",
"interpretive flexibility",
"democratic rationalization",
"human agency",
"selective adoption",
"tool taming",
"re-shaped",
"can change",
],
},
{
"id": "unification_vs_homogenization",
"label": "Knowledge unification vs statistical homogenization",
"pro_markers": [
"knowledge unification",
"integration layer",
"interconnectedness",
"consilience",
"compiled",
"coherent",
],
"con_markers": [
"stochastic parrot",
"homogenization",
"illusion",
"veneer",
"lossy",
"lacks understanding",
"database lookup",
],
},
{
"id": "cognition_commodity_vs_mimicry",
"label": "AI cognition commodity vs token mimicry",
"pro_markers": [
"cognition as a commodity",
"price of thinking",
"task-based framework",
"automation",
"productivity",
"cognitive offloading",
],
"con_markers": [
"stochastic parrot",
"doesn't think",
"mimicry",
"predicts tokens",
"no cognitive model",
],
},
{
"id": "retrocausal_attractor",
"label": "Teleological attractor vs unfalsifiable retrocausality",
"pro_markers": [
"teleological attractor",
"retrocausal",
"omega point",
"final cause",
"participatory universe",
"transactional interpretation",
],
"con_markers": [
"unfalsifiability",
"pseudoscience",
"woo",
"causality violation",
"superdeterminism",
],
},
{
"id": "efficiency_vs_jevons",
"label": "Efficiency frees time vs Jevons expansion",
"pro_markers": [
"efficiency gains",
"free up human time",
"productivity",
"surplus",
"cost disease",
],
"con_markers": [
"jevons paradox",
"increased consumption",
"reasoning inflation",
"more complex systems",
"dependency",
],
},
]
CHALLENGE_KEYWORDS = {
"unfalsifiable": 5,
"dogma": 4,
"pseudoscience": 5,
"illusion": 4,
"mimicry": 4,
"lacks understanding": 4,
"circular": 3,
"causality violation": 4,
"superdeterminism": 3,
"lossy": 2,
"stochastic parrot": 5,
}
QUESTION_KEYWORDS = {
1: [
"falsifiable",
"falsifiability",
"unification",
"replacement",
"fragment",
"distort",
"evidence",
"test",
"stochastic",
],
2: [
"identity",
"human",
"consciousness",
"agency",
"values",
"pragmatic",
"continuity",
"survival",
],
3: [
"individual",
"workers",
"labor",
"skills",
"strategy",
"governance",
"practical",
"action",
],
4: [
"cheating",
"tools",
"dependency",
"ratchet",
"adoption",
"ethics",
"norm",
],
5: [
"timeline",
"threshold",
"when",
"prediction",
"curve",
"years",
"exponential",
"phase",
],
}
@dataclass
class Doc:
path: Path
slug: str
title: str
text: str
sections: dict[str, str]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
default_root = Path(__file__).resolve().parents[2]
parser.add_argument("--project-root", type=Path, default=default_root)
parser.add_argument("--research-dir", type=Path)
parser.add_argument("--paper-008", type=Path)
parser.add_argument("--out-dir", type=Path, default=Path(__file__).resolve().parent)
return parser.parse_args()
def clean_inline_md(text: str) -> str:
text = text.strip()
text = re.sub(r"`([^`]+)`", r"\1", text)
text = re.sub(r"\*\*([^*]+)\*\*", r"\1", text)
text = re.sub(r"\*([^*]+)\*", r"\1", text)
text = re.sub(r"\[(.*?)\]\((.*?)\)", r"\1", text)
return re.sub(r"\s+", " ", text).strip()
def extract_sections(text: str) -> dict[str, str]:
matches = list(HEADING_RE.finditer(text))
if not matches:
return {}
sections: dict[str, str] = {}
for idx, match in enumerate(matches):
heading = clean_inline_md(match.group(2)).lower()
start = match.end()
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
sections[heading] = text[start:end].strip()
return sections
def load_research_docs(research_dir: Path) -> list[Doc]:
docs: list[Doc] = []
for path in sorted(research_dir.glob("*.md")):
raw = path.read_text(encoding="utf-8")
title = path.stem
for line in raw.splitlines():
if line.startswith("# "):
title = clean_inline_md(line[2:])
break
docs.append(
Doc(
path=path,
slug=path.stem,
title=title,
text=raw,
sections=extract_sections(raw),
)
)
return docs
def normalize_person_name(name: str) -> str:
stripped = re.sub(r"\bet al\.?", "", name, flags=re.IGNORECASE)
stripped = stripped.replace("&", " and ")
stripped = re.sub(r"\([^)]*\)", "", stripped)
stripped = re.sub(r"[^A-Za-z .'-]", " ", stripped)
stripped = re.sub(r"\s+", " ", stripped).strip()
return stripped.lower()
def split_possible_names(chunk: str) -> list[str]:
chunk = clean_inline_md(chunk)
chunk = chunk.split(":", 1)[0]
chunk = re.sub(r"\([^)]*\)", "", chunk)
chunk = re.sub(r'"[^"]+"', "", chunk)
chunk = chunk.replace("&", " and ")
names = NAME_RE.findall(chunk)
out: list[str] = []
for name in names:
name = re.sub(r"\s+", " ", name).strip(" .,:;")
words = [w for w in name.split() if w and w[0].isalpha()]
if len(words) >= 2 and not any(w.lower() in BAD_NAME_WORDS for w in words):
out.append(" ".join(words))
if not out:
single = re.sub(r"[^A-Za-z'-]", "", chunk).strip()
if (
single
and single[0].isupper()
and single.lower() not in BAD_NAME_WORDS
and len(single) > 3
):
out.append(single)
return out
def extract_scholars(docs: list[Doc]) -> dict[str, dict[str, Any]]:
scholars: dict[str, dict[str, Any]] = {}
for doc in docs:
key_sections = [text for name, text in doc.sections.items() if "key scholars" in name]
candidates: list[str] = []
for section in key_sections:
for line in section.splitlines():
bullet_match = BULLET_RE.match(line)
if not bullet_match:
continue
raw = bullet_match.group(1)
bullet = clean_inline_md(raw)
bold_match = re.search(r"\*\*([^*]+)\*\*", raw)
if bold_match:
candidates.extend(split_possible_names(bold_match.group(1)))
else:
lead = bullet.split(":", 1)[0]
candidates.extend(split_possible_names(lead))
unique = sorted(set(candidates))
text_lower = doc.text.lower()
for name in unique:
key = normalize_person_name(name)
if not key:
continue
entry = scholars.setdefault(
key,
{
"name": name,
"aliases": set(),
"files": set(),
"mention_count": 0,
"contexts": [],
},
)
entry["aliases"].add(name)
entry["files"].add(doc.slug)
surname = name.split()[-1].lower().strip(".,")
local_mentions = []
for sentence in SENTENCE_SPLIT_RE.split(doc.text):
sentence_clean = clean_inline_md(sentence)
if surname and surname in sentence_clean.lower():
local_mentions.append(sentence_clean)
if not local_mentions:
if surname and surname in text_lower:
local_mentions = [f"Mentioned in {doc.slug}"]
else:
local_mentions = [f"Listed in {doc.slug}"]
entry["mention_count"] += len(local_mentions)
for snippet in local_mentions[:3]:
entry["contexts"].append({"file": doc.slug, "snippet": snippet})
for entry in scholars.values():
entry["aliases"] = sorted(entry["aliases"])
entry["files"] = sorted(entry["files"])
return scholars
def extract_title_from_source_line(line: str) -> str:
line_clean = clean_inline_md(line)
quoted = re.findall(r'"([^"]+)"', line)
if quoted:
return clean_inline_md(quoted[0])
italic = re.findall(r"\*([^*]+)\*", line)
if italic:
return clean_inline_md(italic[0])
year_match = re.search(r"\(\d{4}\)\.?", line_clean)
if year_match:
tail = line_clean[year_match.end() :].strip(" .:-")
if tail:
return tail.split(".", 1)[0].strip()
return line_clean
def extract_authors_from_source_line(line: str) -> list[str]:
line_clean = clean_inline_md(line)
year_match = re.search(r"\(\d{4}\)", line_clean)
head = line_clean[: year_match.start()].strip() if year_match else line_clean
head = head.replace("&", " and ")
head = re.sub(r"\bet al\.?", "", head, flags=re.IGNORECASE)
parts = [p.strip(" ,.-") for p in re.split(r"\band\b|;", head) if p.strip(" ,.-")]
names: list[str] = []
for part in parts:
if re.search(r"[A-Za-z]", part):
names.append(part)
return names
def normalize_title(title: str) -> str:
title = title.lower()
title = re.sub(r"[^a-z0-9 ]", " ", title)
return re.sub(r"\s+", " ", title).strip()
def extract_bibliography(docs: list[Doc]) -> dict[str, dict[str, Any]]:
bibliography: dict[str, dict[str, Any]] = {}
for doc in docs:
sources = [text for name, text in doc.sections.items() if name.startswith("sources")]
for src in sources:
for line in src.splitlines():
bullet = BULLET_RE.match(line)
if not bullet:
continue
raw = bullet.group(1)
title = extract_title_from_source_line(raw)
if not title:
continue
key = normalize_title(title)
if not key:
continue
entry = bibliography.setdefault(
key,
{
"title": title,
"authors": set(),
"files": set(),
"raw_mentions": [],
},
)
entry["files"].add(doc.slug)
entry["raw_mentions"].append(clean_inline_md(raw))
for author in extract_authors_from_source_line(raw):
entry["authors"].add(author)
for entry in bibliography.values():
entry["authors"] = sorted(entry["authors"])
entry["files"] = sorted(entry["files"])
entry["relevance"] = len(entry["files"]) * 2 + len(entry["raw_mentions"])
return bibliography
def first_sentence_with_marker(text: str, marker: str) -> str | None:
for sentence in SENTENCE_SPLIT_RE.split(text):
if marker in sentence.lower():
return clean_inline_md(sentence)
return None
def detect_contradictions(docs: list[Doc]) -> list[dict[str, Any]]:
contradictions: list[dict[str, Any]] = []
for rule in TOPIC_RULES:
pro_evidence: list[dict[str, str]] = []
con_evidence: list[dict[str, str]] = []
for doc in docs:
text_lower = doc.text.lower()
for marker in rule["pro_markers"]:
if marker in text_lower:
snippet = first_sentence_with_marker(doc.text, marker)
if snippet:
pro_evidence.append(
{"file": doc.slug, "marker": marker, "snippet": snippet}
)
break
for marker in rule["con_markers"]:
if marker in text_lower:
snippet = first_sentence_with_marker(doc.text, marker)
if snippet:
con_evidence.append(
{"file": doc.slug, "marker": marker, "snippet": snippet}
)
break
pro_files = {item["file"] for item in pro_evidence}
con_files = {item["file"] for item in con_evidence}
if pro_files - con_files and con_files - pro_files:
contradictions.append(
{
"topic": rule["label"],
"topic_id": rule["id"],
"supports": pro_evidence[:4],
"challenges": con_evidence[:4],
"supporting_files": sorted(pro_files),
"challenging_files": sorted(con_files),
}
)
return contradictions
def extract_open_questions(paper_008: Path) -> list[dict[str, Any]]:
text = paper_008.read_text(encoding="utf-8")
marker = "## Open Questions for Paper 009"
if marker not in text:
raise RuntimeError("Could not find 'Open Questions for Paper 009' in paper 008")
section = text.split(marker, 1)[1]
next_header = re.search(r"\n##\s+", section)
if next_header:
section = section[: next_header.start()]
questions: list[dict[str, Any]] = []
for line in section.splitlines():
match = NUMBERED_RE.match(line)
if not match:
continue
idx = int(match.group(1))
body = clean_inline_md(match.group(2))
body = re.sub(r"^\*\*", "", body)
body = re.sub(r"\*\*", "", body)
questions.append({"id": idx, "text": body})
if not questions:
raise RuntimeError("No numbered open questions found in paper 008")
return questions
def map_to_open_questions(
docs: list[Doc], open_questions: list[dict[str, Any]]
) -> list[dict[str, Any]]:
coverage: list[dict[str, Any]] = []
for question in open_questions:
qid = question["id"]
keywords = QUESTION_KEYWORDS.get(qid, [])
file_scores: list[dict[str, Any]] = []
total = 0
for doc in docs:
score = 0
snippets: list[str] = []
for sentence in SENTENCE_SPLIT_RE.split(doc.text):
sentence_clean = clean_inline_md(sentence)
hits = sum(1 for kw in keywords if kw in sentence_clean.lower())
if hits:
score += hits
if len(snippets) < 3:
snippets.append(sentence_clean)
if score:
total += score
if score >= 12:
level = "high"
elif score >= 6:
level = "medium"
else:
level = "low"
file_scores.append(
{
"file": doc.slug,
"score": score,
"level": level,
"snippets": snippets,
}
)
file_scores.sort(key=lambda x: x["score"], reverse=True)
coverage.append(
{
"question_id": qid,
"question": question["text"],
"total_score": total,
"supporting_files": file_scores,
}
)
coverage.sort(key=lambda x: x["question_id"])
return coverage
def extract_strongest_challenges(docs: list[Doc]) -> list[dict[str, Any]]:
challenges: list[dict[str, Any]] = []
for doc in docs:
counter_sections = [
text
for name, text in doc.sections.items()
if "counterarguments" in name or "critiques" in name
]
if not counter_sections:
continue
for section in counter_sections:
for line in section.splitlines():
bullet = BULLET_RE.match(line)
if not bullet:
continue
text = clean_inline_md(bullet.group(1))
lower = text.lower()
score = 1
for keyword, weight in CHALLENGE_KEYWORDS.items():
if keyword in lower:
score += weight
challenges.append({"file": doc.slug, "text": text, "score": score})
merged: dict[str, dict[str, Any]] = {}
for challenge in challenges:
key = challenge["text"].lower()
if key not in merged:
merged[key] = {
"text": challenge["text"],
"score": challenge["score"],
"files": {challenge["file"]},
}
else:
merged[key]["score"] += challenge["score"]
merged[key]["files"].add(challenge["file"])
ranked = sorted(
(
{
"text": item["text"],
"score": item["score"],
"files": sorted(item["files"]),
}
for item in merged.values()
),
key=lambda x: x["score"],
reverse=True,
)
return ranked[:10]
def detect_emergent_themes(docs: list[Doc]) -> list[dict[str, Any]]:
themes = {
"Governance and agency design": [
"agency",
"democratic",
"community",
"policy",
"selective adoption",
"governance",
],
"Economic concentration and labor shift": [
"labor",
"capital",
"commodity",
"automation",
"class",
"pricing",
],
"Epistemic reliability and grounding": [
"understand",
"stochastic",
"illusion",
"lossy",
"falsifiable",
"evidence",
],
"Civilizational lock-in and resilience": [
"lock-in",
"path dependence",
"retreat",
"dependency",
"ratchet",
"reversal",
],
}
scored: list[dict[str, Any]] = []
corpus = "\n".join(doc.text.lower() for doc in docs)
for theme, keywords in themes.items():
score = sum(corpus.count(k) for k in keywords)
if score > 0:
scored.append({"theme": theme, "score": score})
scored.sort(key=lambda x: x["score"], reverse=True)
return scored
def build_structured_result(
docs: list[Doc],
scholars: dict[str, dict[str, Any]],
bibliography: dict[str, dict[str, Any]],
contradictions: list[dict[str, Any]],
open_question_coverage: list[dict[str, Any]],
strongest_challenges: list[dict[str, Any]],
emergent_themes: list[dict[str, Any]],
) -> dict[str, Any]:
scholars_ranked = sorted(
scholars.values(),
key=lambda s: (len(s["files"]), s["mention_count"]),
reverse=True,
)
bibliography_ranked = sorted(
bibliography.values(), key=lambda b: b["relevance"], reverse=True
)
return {
"meta": {
"research_files": [doc.slug for doc in docs],
"research_file_count": len(docs),
},
"scholars": scholars_ranked,
"bibliography": bibliography_ranked,
"contradictions": contradictions,
"open_question_coverage": open_question_coverage,
"strongest_challenges": strongest_challenges,
"emergent_themes": emergent_themes,
}
def render_digest(result: dict[str, Any]) -> str:
lines: list[str] = []
lines.append("# Integrated Research Digest")
lines.append("")
lines.append("## Scope")
lines.append(
f"Processed {result['meta']['research_file_count']} research file(s): "
+ ", ".join(result["meta"]["research_files"])
)
lines.append("")
lines.append("## Scholars by Frequency")
for scholar in result["scholars"][:20]:
files = ", ".join(scholar["files"])
lines.append(
f"- **{scholar['name']}** — files: {len(scholar['files'])}; mentions: {scholar['mention_count']}; in: {files}"
)
lines.append("")
lines.append("## Unified Bibliography")
for item in result["bibliography"][:40]:
authors = ", ".join(item["authors"]) if item["authors"] else "Unknown"
files = ", ".join(item["files"])
lines.append(
f"- **{item['title']}** ({authors}) — relevance {item['relevance']}; cited in: {files}"
)
lines.append("")
lines.append("## Contradiction Report")
if not result["contradictions"]:
lines.append("- No cross-file contradictions detected by the current heuristic.")
for item in result["contradictions"]:
lines.append(f"### {item['topic']}")
lines.append("- Supporting evidence:")
for support in item["supports"]:
lines.append(
f" - `{support['file']}` ({support['marker']}): {support['snippet']}"
)
lines.append("- Challenging evidence:")
for challenge in item["challenges"]:
lines.append(
f" - `{challenge['file']}` ({challenge['marker']}): {challenge['snippet']}"
)
lines.append("")
lines.append("## Paper 009 Coverage Map")
for item in result["open_question_coverage"]:
if item["supporting_files"]:
max_level = max(fs["level"] for fs in item["supporting_files"])
else:
max_level = "none"
lines.append(
f"### Q{item['question_id']} (total score {item['total_score']}, strongest level {max_level})"
)
lines.append(f"{item['question']}")
if not item["supporting_files"]:
lines.append("- No supporting material detected.")
continue
for fs in item["supporting_files"][:4]:
lines.append(f"- `{fs['file']}`: score {fs['score']} ({fs['level']})")
for snip in fs["snippets"][:2]:
lines.append(f" - {snip}")
lines.append("")
lines.append("## Strongest Challenges")
if not result["strongest_challenges"]:
lines.append("- No challenge bullets detected.")
for item in result["strongest_challenges"]:
lines.append(
f"- **Score {item['score']}** ({', '.join(item['files'])}): {item['text']}"
)
return "\n".join(lines) + "\n"
def render_outline(result: dict[str, Any]) -> str:
coverage_sorted = sorted(
result["open_question_coverage"], key=lambda x: x["total_score"], reverse=True
)
most_covered = coverage_sorted[:2]
least_covered = coverage_sorted[-2:] if len(coverage_sorted) >= 2 else coverage_sorted
lines: list[str] = []
lines.append("# Suggested Outline for Paper 009")
lines.append("")
lines.append("## Why This Sequence")
lines.append(
"Order starts with heavily-supported questions, then closes with low-coverage questions that require new argumentation or new research."
)
lines.append("")
lines.append("## Coverage Priorities")
lines.append("- Most supported open questions:")
for item in most_covered:
lines.append(
f" - Q{item['question_id']} (score {item['total_score']}): {item['question']}"
)
lines.append("- Least supported open questions:")
for item in least_covered:
lines.append(
f" - Q{item['question_id']} (score {item['total_score']}): {item['question']}"
)
lines.append("")
lines.append("## Proposed Sections")
for item in coverage_sorted:
qid = item["question_id"]
lines.append(f"### Section {qid}: Q{qid}")
lines.append(item["question"])
if item["supporting_files"]:
top_files = ", ".join(fs["file"] for fs in item["supporting_files"][:3])
lines.append(f"- Primary evidence files: {top_files}")
top_snips = [
snip
for fs in item["supporting_files"][:2]
for snip in fs["snippets"][:1]
]
for snip in top_snips:
lines.append(f"- Anchor claim: {snip}")
else:
lines.append("- Primary evidence files: none detected; requires fresh synthesis.")
lines.append("")
lines.append("## Cross-Cutting Counterarguments To Address Explicitly")
for challenge in result["strongest_challenges"][:5]:
lines.append(f"- {challenge['text']} ({', '.join(challenge['files'])})")
lines.append("")
lines.append("## New Themes To Add Beyond Original Open Questions")
for theme in result["emergent_themes"][:4]:
lines.append(f"- {theme['theme']} (signal score {theme['score']})")
return "\n".join(lines) + "\n"
def main() -> int:
args = parse_args()
project_root = args.project_root.resolve()
research_dir = (args.research_dir or (project_root / "research")).resolve()
paper_008 = (args.paper_008 or (project_root / "008-the-ship-of-theseus.md")).resolve()
out_dir = args.out_dir.resolve()
out_dir.mkdir(parents=True, exist_ok=True)
print(f"[integrator] project root: {project_root}")
print(f"[integrator] research dir: {research_dir}")
print(f"[integrator] paper 008: {paper_008}")
print(f"[integrator] output dir: {out_dir}")
docs = load_research_docs(research_dir)
print(f"[integrator] loaded {len(docs)} research file(s)")
if not docs:
print("[integrator] no research files found; writing empty digest/outline")
open_questions = extract_open_questions(paper_008)
print(f"[integrator] extracted {len(open_questions)} open question(s) from Paper 008")
scholars = extract_scholars(docs)
print(f"[integrator] extracted {len(scholars)} unique scholar name(s)")
bibliography = extract_bibliography(docs)
print(f"[integrator] extracted {len(bibliography)} bibliography item(s)")
contradictions = detect_contradictions(docs)
print(f"[integrator] detected {len(contradictions)} contradiction topic(s)")
coverage = map_to_open_questions(docs, open_questions)
print("[integrator] mapped research evidence to Paper 008 open questions")
strongest_challenges = extract_strongest_challenges(docs)
print(f"[integrator] ranked {len(strongest_challenges)} strongest challenge(s)")
emergent_themes = detect_emergent_themes(docs)
print(f"[integrator] found {len(emergent_themes)} emergent theme(s)")
result = build_structured_result(
docs,
scholars,
bibliography,
contradictions,
coverage,
strongest_challenges,
emergent_themes,
)
json_path = out_dir / "integrated.json"
digest_path = out_dir / "digest.md"
outline_path = out_dir / "009_outline_suggestion.md"
json_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
digest_path.write_text(render_digest(result), encoding="utf-8")
outline_path.write_text(render_outline(result), encoding="utf-8")
print(f"[integrator] wrote {json_path}")
print(f"[integrator] wrote {digest_path}")
print(f"[integrator] wrote {outline_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())