File: /home/clawbot-insurance/Skills/claims/fraud-detector/fraud_detector.py
#!/usr/bin/env python3"""Fraud Detector SkillAnalyzes claims for potential fraud indicators"""
import jsonfrom datetime import datetime, timedeltafrom pathlib import Pathfrom typing import Dict, List
class FraudDetector: # Fraud indicators by category FRAUD_INDICATORS = { "claimant_behavior": [ "reluctant_to_provide_information", "excessive_knowledge_of_insurance_process", "pressure_to_settle_quickly", "recent_policy_inception", "multiple_claims_history" ], "claim_characteristics": [ "no_police_report_for_major_accident", "no_witnesses_available", "round_dollar_amounts", "excessive_damage_for_reported_cause", "pre_existing_damage" ], "medical_red_flags": [ "excessive_treatment_for_minor_injuries", "treatment_by_questionable_providers", "diagnostic_tests_without_injury", "soft_tissue_injuries_only", "delayed_treatment_reporting" ], "property_red_flags": [ "recently_acquired_high_value_items", "no_proof_of_ownership", "inflated_repair_estimates", "contractor_not_licensed", "damage_inconsistent_with_weather" ] } def __init__(self, insurer_id: str): self.insurer_id = insurer_id self.memory_dir = Path(f"/home/clawbot-insurance/Memory") def analyze_claim(self, claim_id: str) -> Dict: """Analyze claim for fraud indicators""" # Load claim data claim_file = self.memory_dir / f"Claims/{claim_id}/metadata.json" with open(claim_file) as f: claim = json.load(f) # Load policy history policy_history = self._get_policy_history(claim["policy_number"]) # Check each fraud category indicators_found = [] # Check 1: Recent policy inception policy = self._lookup_policy(claim["policy_number"]) if policy: inception_date = datetime.fromisoformat(policy.get("inception_date", "2020-01-01")) days_since_inception = (datetime.now() - inception_date).days if days_since_inception < 30: indicators_found.append({ "category": "claimant_behavior", "indicator": "recent_policy_inception", "description": f"Policy incepted only {days_since_inception} days ago", "severity": "HIGH", "confidence": 0.9 }) # Check 2: Multiple recent claims recent_claims = [c for c in policy_history if (datetime.now() - datetime.fromisoformat(c["reported_at"])).days < 365] if len(recent_claims) > 2: indicators_found.append({ "category": "claimant_behavior", "indicator": "multiple_claims_history", "description": f"{len(recent_claims)} claims in past year", "severity": "MEDIUM", "confidence": 0.7 }) # Check 3: Round dollar amounts if claim.get("intake_data", {}).get("damage_estimate"): estimate = claim["intake_data"]["damage_estimate"] if estimate.endswith("000") or estimate.endswith("500"): indicators_found.append({ "category": "claim_characteristics", "indicator": "round_dollar_amounts", "description": f"Damage estimate is round number: ${estimate}", "severity": "LOW", "confidence": 0.5 }) # Calculate overall fraud score fraud_score = self._calculate_fraud_score(indicators_found) # Generate analysis report analysis = { "claim_id": claim_id, "analyzed_at": datetime.now().isoformat(), "fraud_score": fraud_score, "risk_level": self._get_risk_level(fraud_score), "indicators_found": indicators_found, "recommendation": self._get_recommendation(fraud_score, indicators_found) } # Save analysis analysis_dir = self.memory_dir / f"Claims/{claim_id}/analysis" analysis_dir.mkdir(parents=True, exist_ok=True) with open(analysis_dir / "fraud-screening.json", 'w') as f: json.dump(analysis, f, indent=2) return analysis def _get_policy_history(self, policy_number: str) -> List[Dict]: """Get claim history for policy""" history = [] claims_dir = self.memory_dir / "Claims" for claim_dir in claims_dir.iterdir(): if claim_dir.is_dir(): metadata_file = claim_dir / "metadata.json" if metadata_file.exists(): with open(metadata_file) as f: claim = json.load(f) if claim.get("policy_number") == policy_number: history.append(claim) return history def _lookup_policy(self, policy_number: str) -> Dict: """Look up policy details""" policies_dir = self.memory_dir / "Policies" for policy_dir in policies_dir.iterdir(): if policy_dir.is_dir(): policy_file = policy_dir / "policy-data.json" if policy_file.exists(): with open(policy_file) as f: policy = json.load(f) if policy.get("policy_number") == policy_number: return policy return None def _calculate_fraud_score(self, indicators: List[Dict]) -> int: """Calculate overall fraud score (0-100)""" score = 0 for indicator in indicators: severity = indicator["severity"] confidence = indicator["confidence"] if severity == "HIGH": score += 30 * confidence elif severity == "MEDIUM": score += 15 * confidence elif severity == "LOW": score += 5 * confidence return min(int(score), 100) def _get_risk_level(self, score: int) -> str: """Get risk level from score""" if score >= 70: return "HIGH" elif score >= 40: return "MEDIUM" else: return "LOW" def _get_recommendation(self, score: int, indicators: List[Dict]) -> str: """Get recommended action""" if score >= 70: return "ESCALATE_TO_SIU" elif score >= 40: return "ENHANCED_INVESTIGATION" else: return "STANDARD_PROCESSING"
# CLI interfaceif __name__ == "__main__": import sys insurer_id = sys.argv[1] claim_id = sys.argv[2] detector = FraudDetector(insurer_id) result = detector.analyze_claim(claim_id) print(json.dumps(result, indent=2))