File: /home/clawbot-insurance/Skills/claims/fraud-detector/fraud_detector.py

#!/usr/bin/env python3"""Fraud Detector SkillAnalyzes claims for potential fraud indicators"""
import jsonfrom datetime import datetime, timedeltafrom pathlib import Pathfrom typing import Dict, List
class FraudDetector:    # Fraud indicators by category    FRAUD_INDICATORS = {        "claimant_behavior": [            "reluctant_to_provide_information",            "excessive_knowledge_of_insurance_process",            "pressure_to_settle_quickly",            "recent_policy_inception",            "multiple_claims_history"        ],        "claim_characteristics": [            "no_police_report_for_major_accident",            "no_witnesses_available",            "round_dollar_amounts",            "excessive_damage_for_reported_cause",            "pre_existing_damage"        ],        "medical_red_flags": [            "excessive_treatment_for_minor_injuries",            "treatment_by_questionable_providers",            "diagnostic_tests_without_injury",            "soft_tissue_injuries_only",            "delayed_treatment_reporting"        ],        "property_red_flags": [            "recently_acquired_high_value_items",            "no_proof_of_ownership",            "inflated_repair_estimates",            "contractor_not_licensed",            "damage_inconsistent_with_weather"        ]    }        def __init__(self, insurer_id: str):        self.insurer_id = insurer_id        self.memory_dir = Path(f"/home/clawbot-insurance/Memory")        def analyze_claim(self, claim_id: str) -> Dict:        """Analyze claim for fraud indicators"""        # Load claim data        claim_file = self.memory_dir / f"Claims/{claim_id}/metadata.json"        with open(claim_file) as f:            claim = json.load(f)                # Load policy history        policy_history = self._get_policy_history(claim["policy_number"])                # Check each fraud category        indicators_found = []                # Check 1: Recent policy inception        policy = self._lookup_policy(claim["policy_number"])        if policy:            inception_date = datetime.fromisoformat(policy.get("inception_date", "2020-01-01"))            days_since_inception = (datetime.now() - inception_date).days                        if days_since_inception < 30:                indicators_found.append({                    "category": "claimant_behavior",                    "indicator": "recent_policy_inception",                    "description": f"Policy incepted only {days_since_inception} days ago",                    "severity": "HIGH",                    "confidence": 0.9                })                # Check 2: Multiple recent claims        recent_claims = [c for c in policy_history if                         (datetime.now() - datetime.fromisoformat(c["reported_at"])).days < 365]                if len(recent_claims) > 2:            indicators_found.append({                "category": "claimant_behavior",                "indicator": "multiple_claims_history",                "description": f"{len(recent_claims)} claims in past year",                "severity": "MEDIUM",                "confidence": 0.7            })                # Check 3: Round dollar amounts        if claim.get("intake_data", {}).get("damage_estimate"):            estimate = claim["intake_data"]["damage_estimate"]            if estimate.endswith("000") or estimate.endswith("500"):                indicators_found.append({                    "category": "claim_characteristics",                    "indicator": "round_dollar_amounts",                    "description": f"Damage estimate is round number: ${estimate}",                    "severity": "LOW",                    "confidence": 0.5                })                # Calculate overall fraud score        fraud_score = self._calculate_fraud_score(indicators_found)                # Generate analysis report        analysis = {            "claim_id": claim_id,            "analyzed_at": datetime.now().isoformat(),            "fraud_score": fraud_score,            "risk_level": self._get_risk_level(fraud_score),            "indicators_found": indicators_found,            "recommendation": self._get_recommendation(fraud_score, indicators_found)        }                # Save analysis        analysis_dir = self.memory_dir / f"Claims/{claim_id}/analysis"        analysis_dir.mkdir(parents=True, exist_ok=True)                with open(analysis_dir / "fraud-screening.json", 'w') as f:            json.dump(analysis, f, indent=2)                return analysis        def _get_policy_history(self, policy_number: str) -> List[Dict]:        """Get claim history for policy"""        history = []                claims_dir = self.memory_dir / "Claims"        for claim_dir in claims_dir.iterdir():            if claim_dir.is_dir():                metadata_file = claim_dir / "metadata.json"                if metadata_file.exists():                    with open(metadata_file) as f:                        claim = json.load(f)                        if claim.get("policy_number") == policy_number:                            history.append(claim)                return history        def _lookup_policy(self, policy_number: str) -> Dict:        """Look up policy details"""        policies_dir = self.memory_dir / "Policies"                for policy_dir in policies_dir.iterdir():            if policy_dir.is_dir():                policy_file = policy_dir / "policy-data.json"                if policy_file.exists():                    with open(policy_file) as f:                        policy = json.load(f)                        if policy.get("policy_number") == policy_number:                            return policy                return None        def _calculate_fraud_score(self, indicators: List[Dict]) -> int:        """Calculate overall fraud score (0-100)"""        score = 0                for indicator in indicators:            severity = indicator["severity"]            confidence = indicator["confidence"]                        if severity == "HIGH":                score += 30 * confidence            elif severity == "MEDIUM":                score += 15 * confidence            elif severity == "LOW":                score += 5 * confidence                return min(int(score), 100)        def _get_risk_level(self, score: int) -> str:        """Get risk level from score"""        if score >= 70:            return "HIGH"        elif score >= 40:            return "MEDIUM"        else:            return "LOW"        def _get_recommendation(self, score: int, indicators: List[Dict]) -> str:        """Get recommended action"""        if score >= 70:            return "ESCALATE_TO_SIU"        elif score >= 40:            return "ENHANCED_INVESTIGATION"        else:            return "STANDARD_PROCESSING"
# CLI interfaceif __name__ == "__main__":    import sys        insurer_id = sys.argv[1]    claim_id = sys.argv[2]        detector = FraudDetector(insurer_id)    result = detector.analyze_claim(claim_id)        print(json.dumps(result, indent=2))