"""Credibility scoring and bot/disinfo detection. Assigns a 0.0–1.0 credibility score to each post based on heuristic signals. Posts below the configured threshold are excluded or flagged so they don't pollute the sentiment analysis. Signals are platform-aware — each platform has different indicators of inauthentic behavior. """ from __future__ import annotations import re from dataclasses import dataclass, field from datetime import datetime, timezone @dataclass class CredibilityResult: """Credibility assessment for a single post.""" score: float # 0.0 (likely bot/disinfo) to 1.0 (likely authentic) flags: list[str] = field(default_factory=list) # human-readable reasons is_excluded: bool = False # below min_credibility_score is_flagged: bool = False # between min and flag threshold @property def label(self) -> str: if self.score >= 0.7: return "likely_authentic" if self.score >= 0.5: return "uncertain" if self.score >= 0.3: return "suspicious" return "likely_inauthentic" # --- Shared heuristics --- # Common bot patterns in text _BOT_TEXT_PATTERNS = [ # Crypto/scam spam re.compile(r"(?i)(dm me|check my bio|link in bio|click here|free giveaway)"), re.compile(r"(?i)(join my|subscribe to|follow me for|🔥.*🔥.*🔥)"), # Astroturfing phrases re.compile(r"(?i)(i (just )?(discovered|found|tried) this (amazing|incredible|awesome))"), re.compile(r"(?i)(game.?changer|life.?changing|you won'?t believe)"), # Excessive hashtags (5+) re.compile(r"(#\w+\s*){5,}"), # Walls of emojis (10+ consecutive) re.compile(r"[\U0001F300-\U0001FAFF]{10,}"), # Repetitive characters (spammy emphasis) re.compile(r"(.)\1{9,}"), ] # Coordinated campaign indicators: identical or near-identical text # This is checked at the batch level, not per-post def _check_text_patterns(text: str) -> list[str]: """Check text against common bot/spam patterns.""" flags = [] for pattern in _BOT_TEXT_PATTERNS: if pattern.search(text): flags.append(f"bot_text_pattern: {pattern.pattern[:60]}") if len(text) < 15: flags.append("very_short_text") return flags def _engagement_ratio_score( likes: int, reposts: int, replies: int ) -> tuple[float, list[str]]: """Score based on engagement ratios. Authentic posts tend to have a mix of likes, replies, and reposts. Bot-amplified posts often have inflated likes with very few replies, or massive repost counts with no discussion. """ flags = [] total = likes + reposts + replies if total == 0: return 0.5, ["no_engagement"] # High repost-to-reply ratio suggests amplification without discussion if reposts > 0 and replies == 0 and reposts > 10: flags.append(f"high_repost_no_replies: {reposts} reposts, 0 replies") return 0.3, flags # Extremely high like count with zero replies is suspicious if likes > 100 and replies == 0: flags.append(f"high_likes_no_replies: {likes} likes, 0 replies") return 0.4, flags # Normal engagement return min(1.0, 0.5 + (replies / max(total, 1)) * 0.5), flags # --- Platform-specific scoring --- def score_bluesky_post(post: dict) -> CredibilityResult: """Score a Bluesky post for credibility.""" score = 1.0 flags: list[str] = [] text = post.get("text", "") handle = post.get("author_handle", "") display_name = post.get("author_display_name", "") likes = post.get("like_count", 0) reposts = post.get("repost_count", 0) replies = post.get("reply_count", 0) # Text pattern checks text_flags = _check_text_patterns(text) if text_flags: score -= 0.15 * len(text_flags) flags.extend(text_flags) # Handle heuristics # Randomly generated handles (long hex/number strings) if re.match(r"^[a-f0-9]{8,}\.", handle): flags.append(f"random_handle: {handle}") score -= 0.3 # No display name set if not display_name or display_name == handle: flags.append("no_display_name") score -= 0.1 # Engagement ratio eng_score, eng_flags = _engagement_ratio_score(likes, reposts, replies) flags.extend(eng_flags) score = score * 0.6 + eng_score * 0.4 return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags) def score_reddit_post(post: dict) -> CredibilityResult: """Score a Reddit post for credibility.""" score = 1.0 flags: list[str] = [] text = post.get("selftext", "") or post.get("title", "") author = post.get("author", "") upvote_ratio = post.get("upvote_ratio", 0.5) post_score = post.get("score", 0) num_comments = post.get("num_comments", 0) # Text patterns text_flags = _check_text_patterns(text) if text_flags: score -= 0.15 * len(text_flags) flags.extend(text_flags) # Deleted author if author in ("[deleted]", "[removed]"): flags.append("deleted_author") score -= 0.2 # Suspicious username patterns (random alphanumeric + numbers) if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author): flags.append(f"auto_generated_username: {author}") score -= 0.15 # Very controversial ratio (lots of up AND down votes) if upvote_ratio < 0.4 and post_score > 0: flags.append(f"highly_controversial: {upvote_ratio:.0%} upvote ratio") score -= 0.1 # High score but zero comments = potential vote manipulation if post_score > 100 and num_comments == 0: flags.append(f"high_score_no_comments: {post_score} score, 0 comments") score -= 0.2 # Low-effort cross-post spam: very short title, external link, no selftext if ( len(post.get("title", "")) < 20 and not post.get("is_self", True) and not post.get("selftext") ): flags.append("possible_link_spam") score -= 0.1 return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags) def score_reddit_comment(comment: dict) -> CredibilityResult: """Score a Reddit comment for credibility.""" score = 1.0 flags: list[str] = [] body = comment.get("body", "") author = comment.get("author", "") comment_score = comment.get("score", 0) text_flags = _check_text_patterns(body) if text_flags: score -= 0.15 * len(text_flags) flags.extend(text_flags) if author in ("[deleted]", "[removed]"): flags.append("deleted_author") score -= 0.2 if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author): flags.append(f"auto_generated_username: {author}") score -= 0.15 # Heavily downvoted if comment_score < -5: flags.append(f"heavily_downvoted: {comment_score}") score -= 0.15 return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags) def score_hackernews_post(post: dict) -> CredibilityResult: """Score a HN story for credibility. HN is generally higher-signal than social media, but we still check for low-effort submissions and spammy patterns. """ score = 1.0 flags: list[str] = [] title = post.get("title", "") text = post.get("story_text", "") or title points = post.get("points", 0) num_comments = post.get("num_comments", 0) text_flags = _check_text_patterns(text) if text_flags: score -= 0.1 * len(text_flags) flags.extend(text_flags) # Zero points = the community didn't find it valuable if points == 0: flags.append("zero_points") score -= 0.1 # HN is generally more credible, start with a bonus score = min(1.0, score + 0.1) return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags) def score_hackernews_comment(comment: dict) -> CredibilityResult: """Score a HN comment for credibility.""" score = 1.0 flags: list[str] = [] text = comment.get("comment_text", "") text_flags = _check_text_patterns(text) if text_flags: score -= 0.1 * len(text_flags) flags.extend(text_flags) # HN comments are generally higher quality score = min(1.0, score + 0.1) return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags) # --- Batch-level coordination detection --- def detect_coordination(posts: list[dict], text_key: str = "text") -> list[str]: """Detect coordinated inauthentic behavior across a batch of posts. Looks for: - Duplicate or near-duplicate text (copy-paste campaigns) - Burst posting (many posts in a very short window) - Same talking points with minor variations Returns a list of warning strings. """ warnings: list[str] = [] texts = [p.get(text_key, "") for p in posts if p.get(text_key)] if not texts: return warnings # Exact duplicates seen: dict[str, int] = {} for t in texts: normalized = t.strip().lower() seen[normalized] = seen.get(normalized, 0) + 1 duplicates = {text: count for text, count in seen.items() if count > 1} if duplicates: total_dupes = sum(duplicates.values()) warnings.append( f"COORDINATION WARNING: {len(duplicates)} duplicate texts found " f"({total_dupes} total copies). Possible copy-paste campaign." ) # Near-duplicates: check if many posts share a long common substring # (simplified: check if >30% of posts start with the same 50+ chars) if len(texts) >= 5: prefixes: dict[str, int] = {} for t in texts: prefix = t.strip().lower()[:80] if len(prefix) >= 50: prefixes[prefix] = prefixes.get(prefix, 0) + 1 for prefix, count in prefixes.items(): if count >= len(texts) * 0.3: warnings.append( f"COORDINATION WARNING: {count}/{len(texts)} posts share " f"a common prefix ({prefix[:50]}...). Possible template campaign." ) # Burst detection: if timestamps are available timestamps = [] for p in posts: created = p.get("created_at") or p.get("created_utc") if isinstance(created, str): try: timestamps.append(datetime.fromisoformat(created.replace("Z", "+00:00"))) except (ValueError, TypeError): pass elif isinstance(created, (int, float)): timestamps.append(datetime.fromtimestamp(created, tz=timezone.utc)) if len(timestamps) >= 5: timestamps.sort() # Check if >50% of posts landed within a 5-minute window window_seconds = 300 for i in range(len(timestamps) - 2): window_end = timestamps[i] + __import__("datetime").timedelta(seconds=window_seconds) in_window = sum(1 for t in timestamps if timestamps[i] <= t <= window_end) if in_window >= len(timestamps) * 0.5: warnings.append( f"COORDINATION WARNING: {in_window}/{len(timestamps)} posts " f"appeared within a 5-minute window. Possible coordinated posting." ) break return warnings def filter_and_annotate( posts: list[dict], scorer, min_score: float = 0.3, flag_threshold: float = 0.5, ) -> tuple[list[dict], dict]: """Score all posts, filter out low-credibility ones, and annotate the rest. Args: posts: List of post dicts from any platform. scorer: A scoring function (e.g., score_reddit_post). min_score: Posts below this are excluded. flag_threshold: Posts between min_score and this are flagged. Returns: Tuple of (filtered_posts, stats_dict). Each post in filtered_posts gets a "_credibility" key added. """ filtered = [] stats = { "total": len(posts), "excluded": 0, "flagged": 0, "authentic": 0, "excluded_reasons": [], } for post in posts: result = scorer(post) result.is_excluded = result.score < min_score result.is_flagged = min_score <= result.score < flag_threshold if result.is_excluded: stats["excluded"] += 1 stats["excluded_reasons"].append( {"score": round(result.score, 2), "flags": result.flags} ) continue post["_credibility"] = { "score": round(result.score, 2), "label": result.label, "flags": result.flags, "is_flagged": result.is_flagged, } if result.is_flagged: stats["flagged"] += 1 else: stats["authentic"] += 1 filtered.append(post) return filtered, stats