"""Offline canonical vote-context parsing, matching, classification, and scoring helpers.""" from __future__ import annotations import hashlib import logging import re import subprocess import xml.etree.ElementTree as ET from dataclasses import dataclass from datetime import UTC, date, datetime from pathlib import Path from typing import TYPE_CHECKING, Any from zoneinfo import ZoneInfo from sqlalchemy import delete, select from sqlalchemy.orm import Session, joinedload, selectinload from pipelines.parallelize import parallelize_thread from pipelines.orm.data_science_dev.congress import ( Amendment, AmendmentAction, AmendmentActionRecordedVote, Bill, BillAction, BillActionRecordedVote, BillRelation, BillText, ClassificationMethod, ConfidenceLevel, IngestRun, MeasureFunction, MeasureSubtype, ScoreRun, SourceArtifact, SubjectType, TextResolutionMethod, TextTargetBasis, TextTargetType, Vote, VoteActionMatch, VoteActionScope, VoteClassification, VoteContextAudit, VoteEffect, VoteMeasureLink, VoteMeasureRole, VotePositionMeaning, VoteRelationship, VoteTextTarget, ) if TYPE_CHECKING: from collections.abc import Iterable, Sequence logger = logging.getLogger(__name__) CLASSIFICATION_VERSION = "canonical_vote_context_v3" SCORING_VERSION = "canonical_vote_scores_v3" EASTERN_TIMEZONE = ZoneInfo("America/New_York") OFFLINE_BILLSTATUS_MIN_CONGRESS = 108 PARALLEL_FILE_CHUNK_SIZE = 500 PARALLEL_PROGRESS_TRACKER = 250 DIRECT_TEXT_PATTERNS = ( "on passage", "on passage of the bill", "on the bill", "on the joint resolution", "on agreeing to the resolution", "on motion to suspend the rules and pass", "on motion to suspend the rules and agree", "on motion to suspend the rules and concur", "on agreeing to the conference report", "on motion to concur", "passed house", "passed senate", ) AMENDMENT_DIRECT_PATTERNS = ( "on the amendment", "on agreeing to the amendment", "agreeing to the amendment", ) PROCEDURAL_PATTERNS = ( "cloture", "motion to proceed", "motion to recommit", "motion to reconsider", "motion to table", "previous question", "ordering the previous question", "rule", ) NON_LEGISLATIVE_PATTERNS = ( "nomination", "treaty", "speaker", "quorum", "journal", "adjourn", ) SPECIAL_RULE_PATTERNS = ( "providing for consideration of", "providing for the consideration of", ) MEASURE_REF_RE = re.compile( r"\b(?PH\.?\s*R\.|S\.|H\.?\s*J\.?\s*Res\.|S\.?\s*J\.?\s*Res\." r"|H\.?\s*Con\.?\s*Res\.|S\.?\s*Con\.?\s*Res\.|H\.?\s*Res\.|S\.?\s*Res\.)" r"\s*(?P\d+)\b", flags=re.IGNORECASE, ) @dataclass(frozen=True) class ParsedRecordedVote: congress: int chamber: str session_number: int roll_number: int vote_datetime: datetime | None vote_url: str | None @dataclass(frozen=True) class ParsedAction: sequence: int action_date: date action_time: str | None action_text: str action_type: str | None action_code: str | None source_system_code: str | None source_system_name: str | None recorded_votes: tuple[ParsedRecordedVote, ...] @dataclass(frozen=True) class ParsedBillRelation: related_key: tuple[int, str, int] relationship_type: str identified_by: str | None latest_action_date: date | None latest_action_text: str | None @dataclass(frozen=True) class ParsedTextVersion: version_code: str version_name: str | None version_date: date | None source_datetime_raw: str | None text_url_xml: str | None text_url_pdf: str | None text_url_html: str | None @dataclass(frozen=True) class ParsedBillStatus: bill_key: tuple[int, str, int] actions: tuple[ParsedAction, ...] relations: tuple[ParsedBillRelation, ...] text_versions: tuple[ParsedTextVersion, ...] @dataclass(frozen=True) class ActionCandidate: scope: VoteActionScope bill_action: BillAction | None amendment_action: AmendmentAction | None score: int match_method: str match_reason: str match_confidence: ConfidenceLevel @property def selected_action_text(self) -> str: if self.bill_action is not None: return self.bill_action.action_text if self.amendment_action is not None: return self.amendment_action.action_text return "" def _chunked[T](items: Sequence[T], chunk_size: int) -> Iterable[Sequence[T]]: """Yield fixed-size slices from a sequence.""" for start in range(0, len(items), chunk_size): yield items[start : start + chunk_size] def get_git_sha(repo_root: Path | None = None) -> str | None: """Best-effort current git SHA for audit/run metadata.""" try: completed = subprocess.run( ["git", "rev-parse", "HEAD"], cwd=repo_root, check=True, capture_output=True, text=True, ) except (OSError, subprocess.SubprocessError): return None return completed.stdout.strip() or None def start_ingest_run( session: Session, *, source_snapshot_label: str, repo_root: Path | None = None, ) -> IngestRun: """Create and persist an ingest run row.""" ingest_run = IngestRun( started_at=datetime.now(UTC), git_sha=get_git_sha(repo_root), classifier_version=CLASSIFICATION_VERSION, source_snapshot_label=source_snapshot_label, status="running", ) session.add(ingest_run) session.commit() return ingest_run def finish_ingest_run( session: Session, ingest_run_id: int, *, status: str, ) -> None: """Mark an ingest run complete.""" ingest_run = session.get(IngestRun, ingest_run_id) if ingest_run is None: return ingest_run.completed_at = datetime.now(UTC) ingest_run.status = status session.commit() def register_source_artifact( session: Session, *, path: Path, source_kind: str, congress: int, chamber: str | None, ingest_run_id: int | None, source_url: str | None = None, ) -> SourceArtifact: """Track the exact local file used for a parsed object.""" payload = path.read_bytes() sha256 = hashlib.sha256(payload).hexdigest() modified_at = datetime.fromtimestamp(path.stat().st_mtime, tz=UTC) artifact = session.scalar( select(SourceArtifact).where( SourceArtifact.local_path == str(path), SourceArtifact.sha256 == sha256, SourceArtifact.ingest_run_id == ingest_run_id, ) ) if artifact is not None: return artifact artifact = SourceArtifact( source_kind=source_kind, congress=congress, chamber=chamber, local_path=str(path), source_url=source_url, sha256=sha256, byte_size=len(payload), modified_at=modified_at, ingested_at=datetime.now(UTC), ingest_run_id=ingest_run_id, ) session.add(artifact) session.flush() return artifact def derive_session_number(congress: int, session_year: int) -> int: """Convert a calendar session year into congressional session number 1 or 2.""" congress_start_year = ((congress - 1) * 2) + 1789 return session_year - congress_start_year + 1 def normalize_chamber(raw: str | None) -> str | None: """Normalize source-specific chamber labels.""" if raw is None: return None value = raw.strip().lower() mapping = { "h": "House", "house": "House", "s": "Senate", "senate": "Senate", } return mapping.get(value, raw) def parse_date_like(value: Any) -> date | None: """Parse a date-like string into a date object.""" if value is None: return None if isinstance(value, date) and not isinstance(value, datetime): return value text = str(value).strip() if not text: return None try: return date.fromisoformat(text[:10]) except ValueError: return None def parse_datetime_like( value: Any, *, fallback_time: str | None = None, ) -> datetime | None: """Parse ISO-ish datetime strings from local vote and billstatus sources.""" if value is None: return None if isinstance(value, datetime): return value.astimezone(UTC) if value.tzinfo else value.replace(tzinfo=UTC) text = str(value).strip() if not text: return None normalized = text.replace("Z", "+00:00") try: parsed = datetime.fromisoformat(normalized) except ValueError: if fallback_time: fallback = f"{text[:10]}T{fallback_time}" try: parsed = datetime.fromisoformat(fallback) except ValueError: return None else: try: parsed = datetime.fromisoformat(f"{text[:10]}T00:00:00") except ValueError: return None if parsed.tzinfo is None: return parsed.replace(tzinfo=UTC) return parsed.astimezone(UTC) def legislative_date_for_comparison( *, vote_datetime: datetime | None, fallback_date: date | None, ) -> date | None: """Compare by chamber-local legislative day when a datetime is available.""" if vote_datetime is not None: return vote_datetime.astimezone(EASTERN_TIMEZONE).date() return fallback_date def measure_subtype_for_bill_type(bill_type: str | None) -> MeasureSubtype | None: """Map repository bill types to normalized measure subtypes.""" if bill_type is None: return None normalized = bill_type.strip().lower() if normalized in {"hr", "s"}: return MeasureSubtype.BILL if normalized in {"hjres", "sjres"}: return MeasureSubtype.JOINT_RESOLUTION if normalized in {"hconres", "sconres"}: return MeasureSubtype.CONCURRENT_RESOLUTION if normalized in {"hres", "sres"}: return MeasureSubtype.SIMPLE_RESOLUTION return None def measure_type_value(bill_type: str | None) -> str | None: """Expose the measure type as a stable, upper-case code.""" if bill_type is None: return None return bill_type.upper() def is_house_origin_measure(bill: Bill) -> bool: """Return True when a bill/resolution originates in the House.""" return bill.bill_type.lower().startswith("h") def is_senate_origin_measure(bill: Bill) -> bool: """Return True when a bill/resolution originates in the Senate.""" return bill.bill_type.lower().startswith("s") def normalized_text(*parts: str | None) -> str: """Normalize action/question/title text for semantic matching.""" joined = " ".join(part for part in parts if part) return " ".join(joined.casefold().split()) def has_amendment_signal(*parts: str | None, raw_amendment_ref: dict | None = None) -> bool: """Heuristic for amendment-related votes.""" if raw_amendment_ref: return True text = normalized_text(*parts) return "amendment" in text def is_non_legislative_question(*parts: str | None) -> bool: """Classify votes that are not about legislation or legislative text.""" text = normalized_text(*parts) return any(pattern in text for pattern in NON_LEGISLATIVE_PATTERNS) def is_procedural_question(*parts: str | None) -> bool: """Heuristic for procedural motions related to legislation.""" text = normalized_text(*parts) return any(pattern in text for pattern in PROCEDURAL_PATTERNS) def is_direct_measure_text_question(*parts: str | None) -> bool: """Heuristic for direct measure/resolution text votes.""" text = normalized_text(*parts) if any(pattern in text for pattern in DIRECT_TEXT_PATTERNS): return True return ( "agreeing to the resolution" in text or "suspend the rules" in text or "conference report" in text or "passed house" in text or "passed senate" in text ) def is_direct_amendment_text_question(*parts: str | None) -> bool: """Heuristic for direct amendment adoption votes.""" text = normalized_text(*parts) if "motion to table the amendment" in text: return False return any(pattern in text for pattern in AMENDMENT_DIRECT_PATTERNS) def is_special_rule_measure( *, bill: Bill | None, question: str | None, action_text: str | None, ) -> bool: """Detect House special rules that govern another measure.""" if bill is None or measure_subtype_for_bill_type(bill.bill_type) is not MeasureSubtype.SIMPLE_RESOLUTION: return False text = normalized_text( question, action_text, bill.title, bill.title_short, bill.official_title, ) return any(pattern in text for pattern in SPECIAL_RULE_PATTERNS) def measure_function_for_vote( *, bill: Bill | None, question: str | None, action_text: str | None, ) -> MeasureFunction | None: """Semantic function of the voted-on measure.""" subtype = measure_subtype_for_bill_type(bill.bill_type if bill else None) if subtype is None: return None if is_special_rule_measure(bill=bill, question=question, action_text=action_text): return MeasureFunction.SPECIAL_RULE text = normalized_text( question, action_text, bill.title if bill else None, bill.title_short if bill else None, bill.official_title if bill else None, ) if "budget resolution" in text: return MeasureFunction.BUDGET_RESOLUTION if subtype is MeasureSubtype.SIMPLE_RESOLUTION and ( "rules of the house" in text or "electing the speaker" in text or "authorizing the speaker" in text ): return MeasureFunction.CHAMBER_INTERNAL if "sense of" in text or "commemorat" in text or "congratulating" in text: return MeasureFunction.COMMEMORATIVE_OR_SENSE_OF return MeasureFunction.SUBSTANTIVE_MEASURE def parse_measure_references( text: str | None, *, congress: int, ) -> list[tuple[int, str, int]]: """Extract referenced measures from question/title/action text.""" if not text: return [] refs: list[tuple[int, str, int]] = [] for match in MEASURE_REF_RE.finditer(text): raw_type = match.group("type").casefold() number = int(match.group("number")) normalized_type = ( raw_type.replace(".", "") .replace(" ", "") .replace("conres", "conres") .replace("jres", "jres") .replace("res", "res") ) normalized_type = normalized_type.replace("hr", "hr", 1) if normalized_type == "s": bill_type = "s" elif normalized_type == "hr": bill_type = "hr" else: bill_type = normalized_type refs.append((congress, bill_type, number)) return refs def require_billstatus_artifacts(congress_dirs: Sequence[Path]) -> None: """Fail fast when canonical offline context files are unavailable.""" missing = [ congress_dir for congress_dir in congress_dirs if not any((congress_dir / "bills").rglob("fdsys_billstatus.xml")) ] if missing: congress_list = ", ".join(path.name for path in missing) msg = ( "Canonical offline vote-context resolution requires local BILLSTATUS " f"artifacts. Missing fdsys_billstatus.xml under congress directories: {congress_list}" ) raise RuntimeError(msg) def filter_context_supported_congress_dirs( congress_dirs: Sequence[Path], ) -> list[Path]: """Return only congress directories supported by offline BILLSTATUS coverage.""" supported: list[Path] = [] skipped: list[Path] = [] for congress_dir in congress_dirs: congress_number = int(congress_dir.name) if congress_number < OFFLINE_BILLSTATUS_MIN_CONGRESS: skipped.append(congress_dir) else: supported.append(congress_dir) if skipped: logger.info( "Skipping canonical vote-context steps for pre-%sth Congress directories: %s", OFFLINE_BILLSTATUS_MIN_CONGRESS, ", ".join(path.name for path in skipped), ) return supported def _xml_local_name(tag: str) -> str: return tag.rsplit("}", 1)[-1] def _xml_text(element: ET.Element | None, *names: str) -> str | None: if element is None: return None for descendant in element.iter(): if _xml_local_name(descendant.tag) in names: text = descendant.text.strip() if descendant.text else None if text: return text return None def _xml_direct_children(element: ET.Element, *names: str) -> list[ET.Element]: return [child for child in list(element) if _xml_local_name(child.tag) in names] def _xml_direct_child(element: ET.Element, *names: str) -> ET.Element | None: for child in list(element): if _xml_local_name(child.tag) in names: return child return None def parse_billstatus_file(path: Path) -> ParsedBillStatus | None: """Parse the official Bill Status XML needed for actions, relations, and text versions.""" try: root = ET.fromstring(path.read_bytes()) except ET.ParseError: logger.exception("Failed to parse bill status XML: %s", path) return None bill_node = _xml_direct_child(root, "bill") if bill_node is None: bill_node = root congress_text = _xml_text(bill_node, "congress") bill_type_text = _xml_text(bill_node, "billType", "bill-type", "type") bill_number_text = _xml_text(bill_node, "billNumber", "bill-number", "number") if not congress_text or not bill_type_text or not bill_number_text: return None bill_key = (int(congress_text), bill_type_text.strip().lower(), int(bill_number_text)) actions_parent = _xml_direct_child(bill_node, "actions") actions: list[ParsedAction] = [] if actions_parent is not None: for index, item in enumerate(_xml_direct_children(actions_parent, "item", "action"), start=1): action_date = parse_date_like(_xml_text(item, "actionDate", "actedAt", "action-date", "acted_at")) action_text = _xml_text(item, "text") or "" if action_date is None or not action_text: continue source_system = _xml_direct_child(item, "sourceSystem") recorded_votes_parent = _xml_direct_child(item, "recordedVotes") recorded_votes: list[ParsedRecordedVote] = [] if recorded_votes_parent is not None: for vote_item in _xml_direct_children(recorded_votes_parent, "recordedVote", "item"): roll_number = _xml_text(vote_item, "rollNumber", "roll-number") chamber = normalize_chamber(_xml_text(vote_item, "chamber")) congress = _xml_text(vote_item, "congress") session_number = _xml_text(vote_item, "sessionNumber", "session-number") if not roll_number or chamber is None or not congress or not session_number: continue recorded_votes.append( ParsedRecordedVote( congress=int(congress), chamber=chamber, session_number=int(session_number), roll_number=int(roll_number), vote_datetime=parse_datetime_like(_xml_text(vote_item, "date")), vote_url=_xml_text(vote_item, "url"), ) ) actions.append( ParsedAction( sequence=index, action_date=action_date, action_time=_xml_text(item, "actionTime", "action-time"), action_text=action_text, action_type=_xml_text(item, "type"), action_code=_xml_text(item, "actionCode", "action-code"), source_system_code=_xml_text(source_system, "code"), source_system_name=_xml_text(source_system, "name"), recorded_votes=tuple(recorded_votes), ) ) relations: list[ParsedBillRelation] = [] related_parent = _xml_direct_child(bill_node, "relatedBills", "relatedBillDetails") if related_parent is not None: for item in _xml_direct_children(related_parent, "item", "relatedBill", "relatedBillDetail"): relation_congress = _xml_text(item, "congress") relation_type = _xml_text(item, "type", "billType") relation_number = _xml_text(item, "number", "billNumber") if not relation_congress or not relation_type or not relation_number: continue relationship_details = _xml_direct_child(item, "relationshipDetails") relationship_item = ( _xml_direct_child(relationship_details, "item") if relationship_details is not None else None ) relations.append( ParsedBillRelation( related_key=( int(relation_congress), relation_type.strip().lower(), int(relation_number), ), relationship_type=( _xml_text( relationship_item, "relationshipType", "relationship-type", "typeOfRelationship", "type", ) or _xml_text( item, "relationshipType", "relationship-type", "typeOfRelationship", ) or "related" ), identified_by=_xml_text( relationship_item, "identifiedBy", "identified-by", ) or _xml_text(item, "identifiedBy", "identified-by"), latest_action_date=parse_date_like(_xml_text(item, "latestActionDate", "latest-action-date")), latest_action_text=_xml_text(item, "latestActionText", "latest-action-text", "latestAction"), ) ) text_versions: list[ParsedTextVersion] = [] titles_parent = _xml_direct_child(bill_node, "titles") title_version_name_to_code: dict[str, str] = {} if titles_parent is not None: for item in _xml_direct_children(titles_parent, "item", "title"): version_name = _xml_text(item, "billTextVersionName") version_code = _xml_text(item, "billTextVersionCode") if version_name and version_code: title_version_name_to_code.setdefault( normalized_text(version_name), version_code.lower(), ) text_versions_parent = _xml_direct_child(bill_node, "textVersions") if text_versions_parent is not None: for item in _xml_direct_children(text_versions_parent, "item", "textVersion"): version_name = _xml_text(item, "type", "versionName") version_code = _xml_text(item, "billTextVersionCode", "versionCode", "typeCode") if version_code is None and version_name is not None: version_code = title_version_name_to_code.get(normalized_text(version_name)) raw_date = _xml_text(item, "date") if not version_code and not version_name: continue formats_parent = _xml_direct_child(item, "formats") xml_url = None pdf_url = None html_url = None if formats_parent is not None: for format_item in _xml_direct_children(formats_parent, "item", "format"): format_type = normalized_text(_xml_text(format_item, "type"), _xml_text(format_item, "name")) url = _xml_text(format_item, "url") if not url: continue if "xml" in format_type: xml_url = url elif "pdf" in format_type: pdf_url = url elif "html" in format_type or "formatted text" in format_type: html_url = url text_versions.append( ParsedTextVersion( version_code=(version_code or version_name or "").lower(), version_name=version_name, version_date=parse_date_like(raw_date), source_datetime_raw=raw_date, text_url_xml=xml_url, text_url_pdf=pdf_url, text_url_html=html_url, ) ) return ParsedBillStatus( bill_key=bill_key, actions=tuple(actions), relations=tuple(relations), text_versions=tuple(text_versions), ) def _parse_billstatus_path(*, path: Path) -> ParsedBillStatus | None: """Thread-friendly wrapper for billstatus XML parsing.""" return parse_billstatus_file(path) def _read_json_path(*, path: Path) -> dict[str, Any] | None: """Thread-friendly wrapper for amendment JSON loading.""" return _read_json(path) def merge_billstatus_text_versions_for_bill( *, bill_id: int, parsed_text_versions: Sequence[ParsedTextVersion], source_artifact_id: int | None, existing_bill_texts: dict[tuple[int, str], BillText], ) -> list[BillText]: """Create or enrich BillText rows from official billstatus metadata. This fills metadata-only bill text rows when local text-versions artifacts do not exist, which allows vote->text resolution to link to an official version even without local content. """ created: list[BillText] = [] for version in parsed_text_versions: version_code = version.version_code.lower() key = (bill_id, version_code) existing = existing_bill_texts.get(key) if existing is None: bill_text = BillText( bill_id=bill_id, version_code=version_code, version_name=version.version_name, text_content=None, date=version.version_date, source_datetime_raw=version.source_datetime_raw, text_url_xml=version.text_url_xml, text_url_pdf=version.text_url_pdf, text_url_html=version.text_url_html, source_artifact_id=source_artifact_id, ) existing_bill_texts[key] = bill_text created.append(bill_text) continue if existing.version_name is None and version.version_name is not None: existing.version_name = version.version_name if existing.date is None and version.version_date is not None: existing.date = version.version_date if existing.source_datetime_raw is None and version.source_datetime_raw is not None: existing.source_datetime_raw = version.source_datetime_raw if existing.text_url_xml is None and version.text_url_xml is not None: existing.text_url_xml = version.text_url_xml if existing.text_url_pdf is None and version.text_url_pdf is not None: existing.text_url_pdf = version.text_url_pdf if existing.text_url_html is None and version.text_url_html is not None: existing.text_url_html = version.text_url_html if existing.source_artifact_id is None and source_artifact_id is not None: existing.source_artifact_id = source_artifact_id return created def build_billstatus_text_version_index( congress_dirs: Sequence[Path], ) -> dict[tuple[int, str, int], dict[str, ParsedTextVersion]]: """Index text-version metadata by bill key and version code.""" index: dict[tuple[int, str, int], dict[str, ParsedTextVersion]] = {} for congress_dir in congress_dirs: billstatus_paths = sorted((congress_dir / "bills").rglob("fdsys_billstatus.xml")) for chunk in _chunked(billstatus_paths, PARALLEL_FILE_CHUNK_SIZE): results = parallelize_thread( _parse_billstatus_path, [{"path": path} for path in chunk], progress_tracker=PARALLEL_PROGRESS_TRACKER, ) for parsed in results.results: if parsed is None: continue version_map = index.setdefault(parsed.bill_key, {}) for version in parsed.text_versions: version_map.setdefault(version.version_code.lower(), version) return index def raw_bill_key_from_ref( raw_bill_ref: dict[str, Any] | None, *, default_congress: int, ) -> tuple[int, str, int] | None: """Resolve a raw vote-side bill reference into the canonical bill key.""" if not raw_bill_ref: return None raw_type = raw_bill_ref.get("type") raw_number = raw_bill_ref.get("number") if raw_type is None or raw_number is None: return None raw_congress = raw_bill_ref.get("congress", default_congress) try: return (int(raw_congress), str(raw_type).lower(), int(raw_number)) except (TypeError, ValueError): return None def parse_vote_source_url(raw_vote: dict[str, Any]) -> str | None: """Best-effort raw vote source URL from vote JSON.""" for key in ("url", "source_url", "sourceUrl"): value = raw_vote.get(key) if isinstance(value, str) and value: return value return None def coerce_raw_ref(raw_value: Any) -> dict[str, Any] | None: """Preserve raw refs as JSON-ish dictionaries.""" if raw_value is None: return None if isinstance(raw_value, dict): return raw_value return {"value": raw_value} def parsed_vote_datetime(raw_vote: dict[str, Any]) -> datetime | None: """Build a full vote datetime when the source exposes one.""" raw_date = raw_vote.get("date") raw_time = raw_vote.get("time") if raw_time is not None and isinstance(raw_date, str): return parse_datetime_like(raw_date, fallback_time=str(raw_time)) return parse_datetime_like(raw_date) def ingest_bill_status_context( session: Session, *, congress_dirs: Sequence[Path], bill_map: dict[tuple[int, str, int], int], ingest_run_id: int | None, ) -> None: """Rebuild bill actions, relations, amendments, and their recorded votes.""" require_billstatus_artifacts(congress_dirs) congress_numbers = [int(path.name) for path in congress_dirs] bill_ids_subquery = select(Bill.id).where(Bill.congress.in_(congress_numbers)) existing_bill_texts = { (bill_text.bill_id, bill_text.version_code.lower()): bill_text for bill_text in session.scalars( select(BillText) .join(Bill, Bill.id == BillText.bill_id) .where(Bill.congress.in_(congress_numbers)) ).all() } session.execute( delete(BillRelation).where(BillRelation.bill_id.in_(bill_ids_subquery)) ) session.execute(delete(BillAction).where(BillAction.bill_id.in_(bill_ids_subquery))) session.execute(delete(Amendment).where(Amendment.congress.in_(congress_numbers))) session.commit() for congress_dir in congress_dirs: bills_dir = congress_dir / "bills" if not bills_dir.is_dir(): logger.warning(f"Missing bills directory for congress {congress_dir.name}: {bills_dir}") continue billstatus_paths = sorted(bills_dir.rglob("fdsys_billstatus.xml")) logger.info( "Scanning %d bill status files from %s", len(billstatus_paths), congress_dir.name, ) for chunk in _chunked(billstatus_paths, PARALLEL_FILE_CHUNK_SIZE): results = parallelize_thread( _parse_billstatus_path, [{"path": path} for path in chunk], progress_tracker=PARALLEL_PROGRESS_TRACKER, ) for path, parsed in zip(chunk, results.results, strict=True): if parsed is None: continue bill_id = bill_map.get(parsed.bill_key) if bill_id is None: continue artifact = register_source_artifact( session, path=path, source_kind="billstatus_xml", congress=parsed.bill_key[0], chamber=None, ingest_run_id=ingest_run_id, ) session.add_all( merge_billstatus_text_versions_for_bill( bill_id=bill_id, parsed_text_versions=parsed.text_versions, source_artifact_id=artifact.id, existing_bill_texts=existing_bill_texts, ) ) for relation in parsed.relations: related_bill_id = bill_map.get(relation.related_key) if related_bill_id is None: continue session.add( BillRelation( bill_id=bill_id, related_bill_id=related_bill_id, relationship_type=relation.relationship_type, identified_by=relation.identified_by, latest_action_date=relation.latest_action_date, latest_action_text=relation.latest_action_text, ) ) for action in parsed.actions: bill_action = BillAction( bill_id=bill_id, sequence=action.sequence, action_date=action.action_date, action_time=action.action_time, action_text=action.action_text, action_type=action.action_type, action_code=action.action_code, source_system_code=action.source_system_code, source_system_name=action.source_system_name, source_artifact_id=artifact.id, ) session.add(bill_action) session.flush() for recorded_vote in action.recorded_votes: session.add( BillActionRecordedVote( bill_action_id=bill_action.id, congress=recorded_vote.congress, chamber=recorded_vote.chamber, session_number=recorded_vote.session_number, roll_number=recorded_vote.roll_number, vote_datetime=recorded_vote.vote_datetime, vote_url=recorded_vote.vote_url, ) ) amendments_dir = congress_dir / "amendments" if amendments_dir.is_dir(): amendment_paths = sorted(amendments_dir.rglob("data.json")) logger.info( "Scanning %d amendment files from %s", len(amendment_paths), congress_dir.name, ) for chunk in _chunked(amendment_paths, PARALLEL_FILE_CHUNK_SIZE): results = parallelize_thread( _read_json_path, [{"path": path} for path in chunk], progress_tracker=PARALLEL_PROGRESS_TRACKER, ) for amendment_path, raw in zip(chunk, results.results, strict=True): if raw is None: continue amendment = _parse_amendment_json( session, raw=raw, bill_map=bill_map, ingest_run_id=ingest_run_id, path=amendment_path, ) if amendment is not None: session.add(amendment) session.commit() def _parse_amendment_json( session: Session, *, raw: dict[str, Any], bill_map: dict[tuple[int, str, int], int], ingest_run_id: int | None, path: Path, ) -> Amendment | None: congress = raw.get("congress") amendment_type = raw.get("amendment_type") or raw.get("type") number = raw.get("number") if congress is None or amendment_type is None or number is None: return None artifact = register_source_artifact( session, path=path, source_kind="amendment_json", congress=int(congress), chamber=normalize_chamber(raw.get("chamber")), ingest_run_id=ingest_run_id, ) amended_bill_id = None amended_bill_ref = raw.get("amends_bill") or raw.get("bill") or raw.get("amended_bill") if isinstance(amended_bill_ref, dict): amended_bill_key = raw_bill_key_from_ref( amended_bill_ref, default_congress=int(congress), ) if amended_bill_key is not None: amended_bill_id = bill_map.get(amended_bill_key) amendment = Amendment( congress=int(congress), amendment_type=str(amendment_type).lower(), number=int(number), chamber=normalize_chamber(raw.get("chamber")), description=raw.get("description"), purpose=raw.get("purpose"), amended_bill_id=amended_bill_id, source_path=str(path), source_artifact_id=artifact.id, ) session.add(amendment) session.flush() actions = raw.get("actions") if isinstance(actions, list): for index, item in enumerate(actions, start=1): if not isinstance(item, dict): continue action_date = parse_date_like(item.get("acted_at") or item.get("action_date")) action_text = item.get("text") if action_date is None or not isinstance(action_text, str) or not action_text: continue action = AmendmentAction( amendment_id=amendment.id, sequence=index, action_date=action_date, action_time=_extract_time_component(item.get("acted_at")), action_text=action_text, action_type=item.get("type"), action_code=item.get("state") or item.get("vote_type"), source_system_code=None, source_system_name="unitedstates/congress amendment JSON", source_artifact_id=artifact.id, ) session.add(action) session.flush() roll = item.get("roll") chamber = normalize_chamber(item.get("where")) session_number = item.get("session") if roll and chamber and session_number: session.add( AmendmentActionRecordedVote( amendment_action_id=action.id, congress=int(congress), chamber=chamber, session_number=int(session_number), roll_number=int(roll), vote_datetime=parse_datetime_like(item.get("acted_at")), vote_url=item.get("url"), ) ) return amendment def _extract_time_component(raw_value: Any) -> str | None: if raw_value is None: return None text = str(raw_value) if "T" not in text: return None return text.split("T", 1)[1].replace("Z", "") def _read_json(path: Path) -> dict[str, Any] | None: import orjson try: return orjson.loads(path.read_bytes()) except Exception: logger.exception("Failed to parse %s", path) return None def build_vote_action_matches( session: Session, *, congress_numbers: Sequence[int], ) -> None: """Match raw votes to official bill/amendment actions and persist all candidates.""" vote_ids_subquery = select(Vote.id).where(Vote.congress.in_(list(congress_numbers))) has_votes = session.scalar(select(Vote.id).where(Vote.congress.in_(list(congress_numbers))).limit(1)) if has_votes is None: return session.execute( delete(VoteActionMatch).where(VoteActionMatch.vote_id.in_(vote_ids_subquery)) ) session.execute( delete(VoteContextAudit).where(VoteContextAudit.vote_id.in_(vote_ids_subquery)) ) session.commit() vote_stmt = select(Vote).where(Vote.congress.in_(list(congress_numbers))).order_by(Vote.id) votes = session.scalars(vote_stmt).all() bill_vote_index = _build_bill_action_vote_index(session, congress_numbers) amendment_vote_index = _build_amendment_action_vote_index(session, congress_numbers) for vote in votes: candidates = rank_action_candidates( vote=vote, bill_vote_index=bill_vote_index, amendment_vote_index=amendment_vote_index, ) if not candidates: session.add( VoteContextAudit( vote_id=vote.id, step="vote_action_match", message="no official action matched vote tuple; classification will fall back to vote XML", severity="warning", ) ) continue selected = candidates[0] if len(candidates) > 1: session.add( VoteContextAudit( vote_id=vote.id, step="vote_action_match", message="multiple official actions matched vote tuple; selected highest-ranked candidate", severity="info", ) ) for index, candidate in enumerate(candidates): session.add( VoteActionMatch( vote_id=vote.id, action_scope=candidate.scope, bill_action_id=candidate.bill_action.id if candidate.bill_action else None, amendment_action_id=( candidate.amendment_action.id if candidate.amendment_action else None ), is_selected=index == 0, match_method=candidate.match_method, match_reason=candidate.match_reason, match_confidence=candidate.match_confidence, ) ) session.commit() def _build_bill_action_vote_index( session: Session, congress_numbers: Sequence[int], ) -> dict[tuple[int, str, int, int], list[BillActionRecordedVote]]: rows = session.scalars( select(BillActionRecordedVote) .join(BillAction, BillAction.id == BillActionRecordedVote.bill_action_id) .join(Bill, Bill.id == BillAction.bill_id) .where(Bill.congress.in_(list(congress_numbers))) .options(joinedload(BillActionRecordedVote.bill_action).joinedload(BillAction.bill)) ).all() index: dict[tuple[int, str, int, int], list[BillActionRecordedVote]] = {} for row in rows: key = (row.congress, row.chamber, row.session_number, row.roll_number) index.setdefault(key, []).append(row) return index def _build_amendment_action_vote_index( session: Session, congress_numbers: Sequence[int], ) -> dict[tuple[int, str, int, int], list[AmendmentActionRecordedVote]]: rows = session.scalars( select(AmendmentActionRecordedVote) .join(AmendmentAction, AmendmentAction.id == AmendmentActionRecordedVote.amendment_action_id) .join(Amendment, Amendment.id == AmendmentAction.amendment_id) .where(Amendment.congress.in_(list(congress_numbers))) .options( joinedload(AmendmentActionRecordedVote.amendment_action).joinedload( AmendmentAction.amendment ) ) ).all() index: dict[tuple[int, str, int, int], list[AmendmentActionRecordedVote]] = {} for row in rows: key = (row.congress, row.chamber, row.session_number, row.roll_number) index.setdefault(key, []).append(row) return index def rank_action_candidates( *, vote: Vote, bill_vote_index: dict[tuple[int, str, int, int], list[BillActionRecordedVote]], amendment_vote_index: dict[ tuple[int, str, int, int], list[AmendmentActionRecordedVote] ], ) -> list[ActionCandidate]: """Rank candidate official actions for one vote.""" key = (vote.congress, vote.chamber, vote.session_number, vote.roll_number) bill_candidates = bill_vote_index.get(key, []) amendment_candidates = amendment_vote_index.get(key, []) prefer_amendment = has_amendment_signal( vote.question, vote.result_text, raw_amendment_ref=vote.raw_amendment_ref, ) question_text = normalized_text(vote.question, vote.result_text, vote.vote_type) candidates: list[ActionCandidate] = [] for row in amendment_candidates: action_text = normalized_text(row.amendment_action.action_text, vote.question) score = 100 if prefer_amendment: score += 50 if is_direct_amendment_text_question(action_text): score += 15 if row.amendment_action.action_time: score += 1 candidates.append( ActionCandidate( scope=VoteActionScope.AMENDMENT, bill_action=None, amendment_action=row.amendment_action, score=score, match_method="canonical_vote_tuple", match_reason="matched amendment action recorded vote tuple", match_confidence=ConfidenceLevel.HIGH, ) ) for row in bill_candidates: source_name = row.bill_action.source_system_name or "" score = 50 if not prefer_amendment: score += 20 if "library of congress" not in source_name.casefold(): score += 10 if _semantic_alignment_score(question_text, row.bill_action.action_text) > 0: score += 10 if row.bill_action.action_time: score += 1 candidates.append( ActionCandidate( scope=VoteActionScope.BILL, bill_action=row.bill_action, amendment_action=None, score=score, match_method="canonical_vote_tuple", match_reason="matched bill action recorded vote tuple", match_confidence=ConfidenceLevel.HIGH, ) ) candidates.sort( key=lambda candidate: ( -candidate.score, candidate.bill_action.sequence if candidate.bill_action else candidate.amendment_action.sequence, ) ) return candidates def _semantic_alignment_score(question_text: str, action_text: str) -> int: normalized_action = normalized_text(action_text) if question_text and normalized_action and question_text in normalized_action: return 5 if is_direct_measure_text_question(question_text) and is_direct_measure_text_question(normalized_action): return 3 if is_procedural_question(question_text) and is_procedural_question(normalized_action): return 3 return 0 def classify_votes( session: Session, *, congress_numbers: Sequence[int], bill_map: dict[tuple[int, str, int], int], ) -> None: """Populate vote classifications and measure links from selected matches.""" has_votes = session.scalar(select(Vote.id).where(Vote.congress.in_(list(congress_numbers))).limit(1)) if has_votes is None: return vote_ids_subquery = select(Vote.id).where(Vote.congress.in_(list(congress_numbers))) session.execute( delete(VoteMeasureLink).where(VoteMeasureLink.vote_id.in_(vote_ids_subquery)) ) session.execute( delete(VoteClassification).where(VoteClassification.vote_id.in_(vote_ids_subquery)) ) session.commit() vote_stmt = ( select(Vote) .where(Vote.congress.in_(list(congress_numbers))) .options( selectinload(Vote.action_matches).joinedload(VoteActionMatch.bill_action).joinedload(BillAction.bill), selectinload(Vote.action_matches) .joinedload(VoteActionMatch.amendment_action) .joinedload(AmendmentAction.amendment), ) .order_by(Vote.id) ) votes = session.scalars(vote_stmt).all() for vote in votes: selected_match = next((match for match in vote.action_matches if match.is_selected), None) classification, measure_links, audit_rows = classify_single_vote( vote=vote, selected_match=selected_match, bill_map=bill_map, session=session, ) session.add(classification) for link in measure_links: session.add(link) for audit_row in audit_rows: session.add(audit_row) session.commit() def classify_single_vote( *, vote: Vote, selected_match: VoteActionMatch | None, bill_map: dict[tuple[int, str, int], int], session: Session, ) -> tuple[VoteClassification, list[VoteMeasureLink], list[VoteContextAudit]]: """Classify a single vote and produce measure links.""" audit_rows: list[VoteContextAudit] = [] question_text = vote.question or "" result_text = vote.result_text or "" selected_action_text = "" bill: Bill | None = None amendment: Amendment | None = None method = ClassificationMethod.VOTE_XML_ONLY confidence = ConfidenceLevel.MEDIUM if selected_match is not None: if selected_match.bill_action is not None: bill = selected_match.bill_action.bill if selected_match.amendment_action is not None: amendment = selected_match.amendment_action.amendment selected_action_text = ( selected_match.bill_action.action_text if selected_match.bill_action is not None else selected_match.amendment_action.action_text if selected_match.amendment_action is not None else "" ) method = ( ClassificationMethod.RECORDED_VOTE_ACTION_EXACT if len(vote.action_matches) <= 1 else ClassificationMethod.RECORDED_VOTE_ACTION_DUPLICATE_SOURCE_DEDUPED ) confidence = ConfidenceLevel.HIGH if bill is None and vote.raw_bill_ref: raw_key = raw_bill_key_from_ref(vote.raw_bill_ref, default_congress=vote.congress) if raw_key is not None: raw_bill_id = bill_map.get(raw_key) if raw_bill_id is not None: bill = session.get(Bill, raw_bill_id) subject_type = SubjectType.UNKNOWN vote_relationship = VoteRelationship.UNKNOWN measure_subtype: MeasureSubtype | None = None measure_function: MeasureFunction | None = None measure_type: str | None = None is_legislation_related = False is_direct_text = False is_substantive = False is_lawmaking_vehicle = False is_special_rule = False measure_links: list[VoteMeasureLink] = [] if vote.raw_nomination_ref or "nomination" in normalized_text(question_text, result_text): subject_type = SubjectType.NOMINATION vote_relationship = VoteRelationship.NON_LEGISLATIVE elif vote.raw_treaty_ref or "treaty" in normalized_text(question_text, result_text): subject_type = SubjectType.TREATY vote_relationship = VoteRelationship.NON_LEGISLATIVE elif is_non_legislative_question(question_text, result_text): subject_type = SubjectType.CHAMBER_ADMIN vote_relationship = VoteRelationship.NON_LEGISLATIVE elif amendment is not None or has_amendment_signal(question_text, selected_action_text, raw_amendment_ref=vote.raw_amendment_ref): subject_type = SubjectType.AMENDMENT is_legislation_related = True if is_direct_amendment_text_question(question_text, selected_action_text): vote_relationship = VoteRelationship.AMENDMENT_TEXT_VOTE is_direct_text = True is_substantive = True else: vote_relationship = VoteRelationship.PROCEDURAL_RELATED_TO_AMENDMENT if amendment is not None and amendment.amended_bill_id is not None: role = ( VoteMeasureRole.AMENDS if vote_relationship is VoteRelationship.AMENDMENT_TEXT_VOTE else VoteMeasureRole.PROCEDURAL_TARGET ) measure_links.append( VoteMeasureLink( vote_id=vote.id, measure_id=amendment.amended_bill_id, role=role, source=method.value, confidence=confidence, notes=amendment.purpose, ) ) elif bill is not None or vote.raw_bill_ref: subject_type = SubjectType.MEASURE is_legislation_related = True if bill is not None: measure_type = measure_type_value(bill.bill_type) measure_subtype = measure_subtype_for_bill_type(bill.bill_type) measure_function = measure_function_for_vote( bill=bill, question=question_text, action_text=selected_action_text, ) is_special_rule = measure_function is MeasureFunction.SPECIAL_RULE is_lawmaking_vehicle = measure_subtype in { MeasureSubtype.BILL, MeasureSubtype.JOINT_RESOLUTION, MeasureSubtype.CONCURRENT_RESOLUTION, } if is_direct_measure_text_question(question_text, selected_action_text): vote_relationship = VoteRelationship.DIRECT_TEXT_VOTE is_direct_text = True is_substantive = not is_special_rule and measure_function not in { MeasureFunction.CHAMBER_INTERNAL, MeasureFunction.COMMEMORATIVE_OR_SENSE_OF, } elif is_procedural_question(question_text, selected_action_text): vote_relationship = VoteRelationship.PROCEDURAL_RELATED_TO_MEASURE else: vote_relationship = VoteRelationship.UNKNOWN if bill is not None: role = ( VoteMeasureRole.VOTED_ON if vote_relationship is VoteRelationship.DIRECT_TEXT_VOTE else VoteMeasureRole.PROCEDURAL_TARGET ) measure_links.append( VoteMeasureLink( vote_id=vote.id, measure_id=bill.id, role=role, source=method.value, confidence=confidence, notes=None, ) ) if is_special_rule: underlying_refs = parse_measure_references( " ".join( filter( None, [bill.title, bill.title_short, bill.official_title, selected_action_text, question_text], ) ), congress=vote.congress, ) seen_measure_ids: set[int] = {bill.id} for key in underlying_refs: linked_bill_id = bill_map.get(key) if linked_bill_id is None or linked_bill_id in seen_measure_ids: continue seen_measure_ids.add(linked_bill_id) measure_links.append( VoteMeasureLink( vote_id=vote.id, measure_id=linked_bill_id, role=VoteMeasureRole.RULE_FOR, source="measure_text_parse", confidence=ConfidenceLevel.MEDIUM, notes="parsed from rule title/question/action text", ) ) if len(seen_measure_ids) <= 1: audit_rows.append( VoteContextAudit( vote_id=vote.id, step="vote_context_classify", message="special rule detected but no underlying measure could be resolved from available text", severity="warning", ) ) else: audit_rows.append( VoteContextAudit( vote_id=vote.id, step="vote_context_classify", message="vote remains unclassified after action matching and raw-source parsing", severity="warning", ) ) classification = VoteClassification( vote_id=vote.id, subject_type=subject_type, measure_type=measure_type, measure_subtype=measure_subtype, measure_function=measure_function, vote_relationship=vote_relationship, is_legislation_related=is_legislation_related, is_direct_vote_on_legislative_text=is_direct_text, is_substantive_policy_vote=is_substantive, is_lawmaking_vehicle=is_lawmaking_vehicle, is_special_rule=is_special_rule, classification_method=method, classification_confidence_reason=( "matched recorded vote tuple to official action" if selected_match is not None else "classified from raw vote metadata only" ), confidence=confidence, classified_at=datetime.now(UTC), classification_version=CLASSIFICATION_VERSION, ) return classification, measure_links, audit_rows def resolve_vote_text_targets( session: Session, *, congress_numbers: Sequence[int], ) -> None: """Populate voted/resulting text targets for classified votes.""" has_votes = session.scalar(select(Vote.id).where(Vote.congress.in_(list(congress_numbers))).limit(1)) if has_votes is None: return vote_ids_subquery = select(Vote.id).where(Vote.congress.in_(list(congress_numbers))) session.execute( delete(VoteTextTarget).where(VoteTextTarget.vote_id.in_(vote_ids_subquery)) ) session.commit() vote_stmt = ( select(Vote) .where(Vote.congress.in_(list(congress_numbers))) .options( joinedload(Vote.classification), selectinload(Vote.vote_measure_links).joinedload(VoteMeasureLink.measure).selectinload(Bill.bill_texts), selectinload(Vote.action_matches).joinedload(VoteActionMatch.bill_action).joinedload(BillAction.bill), selectinload(Vote.action_matches) .joinedload(VoteActionMatch.amendment_action) .joinedload(AmendmentAction.amendment), ) .order_by(Vote.id) ) votes = session.scalars(vote_stmt).all() for vote in votes: if vote.classification is None: continue selected_match = next((match for match in vote.action_matches if match.is_selected), None) text_target = resolve_text_target_for_vote(vote=vote, selected_match=selected_match) session.add(text_target) session.commit() def resolve_text_target_for_vote( *, vote: Vote, selected_match: VoteActionMatch | None, ) -> VoteTextTarget: """Resolve one vote's official text target.""" classification = vote.classification assert classification is not None if classification.subject_type is SubjectType.AMENDMENT: related_amendment_id = ( selected_match.amendment_action.amendment_id if selected_match and selected_match.amendment_action is not None else None ) return VoteTextTarget( vote_id=vote.id, text_target_type=TextTargetType.AMENDMENT_TEXT, voted_text_version_id=None, resulting_text_version_id=None, related_amendment_id=related_amendment_id, text_target_basis=TextTargetBasis.AMENDMENT_TEXT, text_resolution_method=TextResolutionMethod.AMENDMENT_TEXT_UNMODELED_PHASE1, text_resolution_confidence_reason="phase 1 does not store amendment text artifacts", confidence=classification.confidence, notes=None, ) if ( classification.subject_type is not SubjectType.MEASURE or not classification.is_direct_vote_on_legislative_text ): return VoteTextTarget( vote_id=vote.id, text_target_type=TextTargetType.NONE, voted_text_version_id=None, resulting_text_version_id=None, related_amendment_id=None, text_target_basis=TextTargetBasis.NO_TEXT_TARGET, text_resolution_method=TextResolutionMethod.NO_TEXT_TARGET, text_resolution_confidence_reason="vote was not a direct vote on legislative text", confidence=classification.confidence, notes=None, ) voted_on_measure = next( ( link.measure for link in vote.vote_measure_links if link.role is VoteMeasureRole.VOTED_ON ), None, ) if voted_on_measure is None: return VoteTextTarget( vote_id=vote.id, text_target_type=TextTargetType.UNKNOWN, voted_text_version_id=None, resulting_text_version_id=None, related_amendment_id=None, text_target_basis=TextTargetBasis.UNKNOWN, text_resolution_method=TextResolutionMethod.UNKNOWN, text_resolution_confidence_reason="no voted_on measure link exists for direct text vote", confidence=ConfidenceLevel.LOW, notes=None, ) action_date = None action_text = "" if selected_match is not None: if selected_match.bill_action is not None: action_date = selected_match.bill_action.action_date action_text = selected_match.bill_action.action_text elif selected_match.amendment_action is not None: action_date = selected_match.amendment_action.action_date action_text = selected_match.amendment_action.action_text if action_date is None: action_date = legislative_date_for_comparison( vote_datetime=vote.vote_datetime, fallback_date=vote.vote_date, ) candidate_texts = list(voted_on_measure.bill_texts) preferred_codes = preferred_bill_text_codes( vote=vote, bill=voted_on_measure, classification=classification, action_text=action_text, ) voted_text, method, basis = choose_best_bill_text( candidate_texts=candidate_texts, action_date=action_date, preferred_codes=preferred_codes, classification=classification, ) resulting_text = determine_resulting_text_version( candidate_texts=candidate_texts, action_date=action_date, action_text=action_text, voted_text=voted_text, ) text_target_type = ( TextTargetType.RESOLUTION_TEXT if classification.measure_subtype is not MeasureSubtype.BILL else TextTargetType.BILL_TEXT ) if classification.is_special_rule: basis = TextTargetBasis.RULE_RESOLUTION_TEXT confidence = classification.confidence if voted_text is not None else ConfidenceLevel.LOW reason = "resolved from official bill text versions" if voted_text is not None else "no eligible bill text version found" return VoteTextTarget( vote_id=vote.id, text_target_type=text_target_type, voted_text_version_id=voted_text.id if voted_text is not None else None, resulting_text_version_id=resulting_text.id if resulting_text is not None else None, related_amendment_id=None, text_target_basis=basis, text_resolution_method=method, text_resolution_confidence_reason=reason, confidence=confidence, notes=None, ) def preferred_bill_text_codes( *, vote: Vote, bill: Bill, classification: VoteClassification, action_text: str | None, ) -> tuple[str, ...]: """Preferred text-version codes for the vote's immediate measure.""" action = normalized_text(vote.question, action_text, vote.result_text) origin_is_house = is_house_origin_measure(bill) origin_is_senate = is_senate_origin_measure(bill) if classification.is_special_rule: return ("ath", "ats", "eh", "es", "cph", "cps") if "conference report" in action: return ("enr", "eah", "eas", "eh", "es") if "concur" in action or "with an amendment" in action or "agreed to senate amendments" in action: return ("eah", "eas", "enr") if vote.chamber == "House" and origin_is_house: return ("eh", "cph") if vote.chamber == "Senate" and origin_is_senate: return ("es", "cps") if "without amendment" in action: if vote.chamber == "Senate" and origin_is_house: return ("rfs", "rds", "eh", "cph") if vote.chamber == "House" and origin_is_senate: return ("rfh", "rdh", "es", "cps") if vote.chamber == "House": return ("eah", "eh", "cph") return ("eas", "es", "cps") def choose_best_bill_text( *, candidate_texts: Sequence[BillText], action_date: date | None, preferred_codes: Sequence[str], classification: VoteClassification, ) -> tuple[BillText | None, TextResolutionMethod, TextTargetBasis]: """Resolve the best official text version for a direct measure vote.""" if not candidate_texts: return None, TextResolutionMethod.UNKNOWN, TextTargetBasis.UNKNOWN preferred_code_set = tuple(code.lower() for code in preferred_codes) eligible = [ bill_text for bill_text in candidate_texts if action_date is None or bill_text.date is None or bill_text.date <= action_date ] if not eligible: eligible = list(candidate_texts) def sort_key(bill_text: BillText) -> tuple[int, int, date, int]: code = bill_text.version_code.lower() exact_date = int(action_date is not None and bill_text.date == action_date) code_rank = -preferred_code_set.index(code) if code in preferred_code_set else -999 bill_date = bill_text.date or date.min return (exact_date, code_rank, bill_date.toordinal(), bill_text.id) best = max(eligible, key=sort_key) code = best.version_code.lower() if action_date is not None and best.date == action_date and code in preferred_code_set: return ( best, TextResolutionMethod.TEXT_EXACT_ACTION_DATE_AND_CODE, TextTargetBasis.EXACT_ACTION_TEXT_VERSION, ) if action_date is not None and best.date == action_date: return ( best, TextResolutionMethod.TEXT_EXACT_ACTION_DATE_WRONG_CODE, TextTargetBasis.EXACT_ACTION_TEXT_VERSION, ) if code in preferred_code_set: basis = ( TextTargetBasis.RECEIVED_PRIOR_CHAMBER_VERSION if code in {"rfh", "rdh", "rfs", "rds"} else TextTargetBasis.RESULTING_ENGROSSED_VERSION ) method = ( TextResolutionMethod.TEXT_RECEIVED_PRIOR_CHAMBER_VERSION if basis is TextTargetBasis.RECEIVED_PRIOR_CHAMBER_VERSION else TextResolutionMethod.TEXT_PRIOR_VERSION_CODE_MATCH ) return best, method, basis return ( best, TextResolutionMethod.TEXT_PRIOR_VERSION_CODE_MATCH, TextTargetBasis.RESULTING_ENGROSSED_VERSION, ) def determine_resulting_text_version( *, candidate_texts: Sequence[BillText], action_date: date | None, action_text: str | None, voted_text: BillText | None, ) -> BillText | None: """Resolve a resulting/enrolled text version without overwriting the voted text.""" if voted_text is None: return None action = normalized_text(action_text) if not action: return voted_text if any(pattern in action for pattern in ("without amendment", "conference report", "agreed to senate amendment", "agreed to house amendment")): enrolled = sorted( ( bill_text for bill_text in candidate_texts if bill_text.version_code.lower() == "enr" and (action_date is None or bill_text.date is None or bill_text.date >= action_date) ), key=lambda bill_text: (bill_text.date or date.max, bill_text.id), ) if enrolled: return enrolled[0] return voted_text def resolve_vote_position_meanings( session: Session, *, congress_numbers: Sequence[int], ) -> None: """Populate yea/nay/present semantic effects for each classified vote.""" has_votes = session.scalar(select(Vote.id).where(Vote.congress.in_(list(congress_numbers))).limit(1)) if has_votes is None: return session.execute( delete(VotePositionMeaning).where( VotePositionMeaning.vote_id.in_( select(Vote.id).where(Vote.congress.in_(list(congress_numbers))) ) ) ) session.commit() vote_stmt = ( select(Vote) .where(Vote.congress.in_(list(congress_numbers))) .options(joinedload(Vote.classification)) .order_by(Vote.id) ) votes = session.scalars(vote_stmt).all() for vote in votes: if vote.classification is None: continue session.add(resolve_vote_position_meaning_for_vote(vote=vote)) session.commit() def resolve_vote_position_meaning_for_vote(*, vote: Vote) -> VotePositionMeaning: """Map Yea/Nay/Present into semantic effects for one classified vote.""" classification = vote.classification assert classification is not None relationship = classification.vote_relationship normalized_question = normalized_text(vote.question, vote.result_text) yea_effect = VoteEffect.UNKNOWN nay_effect = VoteEffect.UNKNOWN present_effect = VoteEffect.UNKNOWN confidence = classification.confidence method = "classification_relationship" if relationship in { VoteRelationship.DIRECT_TEXT_VOTE, VoteRelationship.AMENDMENT_TEXT_VOTE, }: yea_effect = VoteEffect.SUPPORTS_TEXT nay_effect = VoteEffect.OPPOSES_TEXT elif relationship is VoteRelationship.PROCEDURAL_RELATED_TO_MEASURE or relationship is VoteRelationship.PROCEDURAL_RELATED_TO_AMENDMENT: if "motion to table" in normalized_question: yea_effect = VoteEffect.BLOCKS_PROCEDURE nay_effect = VoteEffect.ADVANCES_PROCEDURE elif any(token in normalized_question for token in ("cloture", "motion to proceed", "previous question", "ordering the previous question")): yea_effect = VoteEffect.ADVANCES_PROCEDURE nay_effect = VoteEffect.BLOCKS_PROCEDURE else: confidence = ConfidenceLevel.LOW method = "classification_relationship_unknown_procedural_polarity" else: confidence = ConfidenceLevel.LOW method = "non_legislative_or_unknown" return VotePositionMeaning( vote_id=vote.id, yea_effect=yea_effect, nay_effect=nay_effect, present_effect=present_effect, polarity_confidence=confidence, polarity_method=method, notes=None, ) def create_score_run(session: Session) -> ScoreRun: """Create a score run tied to the most recent ingest snapshot when available.""" latest_ingest_run_id = session.scalar( select(IngestRun.id).order_by(IngestRun.id.desc()).limit(1) ) score_run = ScoreRun( ingest_run_id=latest_ingest_run_id, classifier_version=CLASSIFICATION_VERSION, scoring_version=SCORING_VERSION, included_vote_count=0, excluded_vote_count=0, started_at=datetime.now(UTC), completed_at=None, ) session.add(score_run) session.flush() return score_run def finalize_score_run( session: Session, *, score_run: ScoreRun, included_vote_count: int, excluded_vote_count: int, ) -> None: """Mark a score run complete.""" score_run.included_vote_count = included_vote_count score_run.excluded_vote_count = excluded_vote_count score_run.completed_at = datetime.now(UTC) session.flush()