399 lines
12 KiB
Python
399 lines
12 KiB
Python
"""Credibility scoring and bot/disinfo detection.
|
||
|
||
Assigns a 0.0–1.0 credibility score to each post based on heuristic signals.
|
||
Posts below the configured threshold are excluded or flagged so they don't
|
||
pollute the sentiment analysis.
|
||
|
||
Signals are platform-aware — each platform has different indicators of
|
||
inauthentic behavior.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime, timezone
|
||
|
||
|
||
@dataclass
|
||
class CredibilityResult:
|
||
"""Credibility assessment for a single post."""
|
||
|
||
score: float # 0.0 (likely bot/disinfo) to 1.0 (likely authentic)
|
||
flags: list[str] = field(default_factory=list) # human-readable reasons
|
||
is_excluded: bool = False # below min_credibility_score
|
||
is_flagged: bool = False # between min and flag threshold
|
||
|
||
@property
|
||
def label(self) -> str:
|
||
if self.score >= 0.7:
|
||
return "likely_authentic"
|
||
if self.score >= 0.5:
|
||
return "uncertain"
|
||
if self.score >= 0.3:
|
||
return "suspicious"
|
||
return "likely_inauthentic"
|
||
|
||
|
||
# --- Shared heuristics ---
|
||
|
||
# Common bot patterns in text
|
||
_BOT_TEXT_PATTERNS = [
|
||
# Crypto/scam spam
|
||
re.compile(r"(?i)(dm me|check my bio|link in bio|click here|free giveaway)"),
|
||
re.compile(r"(?i)(join my|subscribe to|follow me for|🔥.*🔥.*🔥)"),
|
||
# Astroturfing phrases
|
||
re.compile(r"(?i)(i (just )?(discovered|found|tried) this (amazing|incredible|awesome))"),
|
||
re.compile(r"(?i)(game.?changer|life.?changing|you won'?t believe)"),
|
||
# Excessive hashtags (5+)
|
||
re.compile(r"(#\w+\s*){5,}"),
|
||
# Walls of emojis (10+ consecutive)
|
||
re.compile(r"[\U0001F300-\U0001FAFF]{10,}"),
|
||
# Repetitive characters (spammy emphasis)
|
||
re.compile(r"(.)\1{9,}"),
|
||
]
|
||
|
||
# Coordinated campaign indicators: identical or near-identical text
|
||
# This is checked at the batch level, not per-post
|
||
|
||
|
||
def _check_text_patterns(text: str) -> list[str]:
|
||
"""Check text against common bot/spam patterns."""
|
||
flags = []
|
||
for pattern in _BOT_TEXT_PATTERNS:
|
||
if pattern.search(text):
|
||
flags.append(f"bot_text_pattern: {pattern.pattern[:60]}")
|
||
if len(text) < 15:
|
||
flags.append("very_short_text")
|
||
return flags
|
||
|
||
|
||
def _engagement_ratio_score(
|
||
likes: int, reposts: int, replies: int
|
||
) -> tuple[float, list[str]]:
|
||
"""Score based on engagement ratios.
|
||
|
||
Authentic posts tend to have a mix of likes, replies, and reposts.
|
||
Bot-amplified posts often have inflated likes with very few replies,
|
||
or massive repost counts with no discussion.
|
||
"""
|
||
flags = []
|
||
total = likes + reposts + replies
|
||
|
||
if total == 0:
|
||
return 0.5, ["no_engagement"]
|
||
|
||
# High repost-to-reply ratio suggests amplification without discussion
|
||
if reposts > 0 and replies == 0 and reposts > 10:
|
||
flags.append(f"high_repost_no_replies: {reposts} reposts, 0 replies")
|
||
return 0.3, flags
|
||
|
||
# Extremely high like count with zero replies is suspicious
|
||
if likes > 100 and replies == 0:
|
||
flags.append(f"high_likes_no_replies: {likes} likes, 0 replies")
|
||
return 0.4, flags
|
||
|
||
# Normal engagement
|
||
return min(1.0, 0.5 + (replies / max(total, 1)) * 0.5), flags
|
||
|
||
|
||
# --- Platform-specific scoring ---
|
||
|
||
|
||
def score_bluesky_post(post: dict) -> CredibilityResult:
|
||
"""Score a Bluesky post for credibility."""
|
||
score = 1.0
|
||
flags: list[str] = []
|
||
|
||
text = post.get("text", "")
|
||
handle = post.get("author_handle", "")
|
||
display_name = post.get("author_display_name", "")
|
||
likes = post.get("like_count", 0)
|
||
reposts = post.get("repost_count", 0)
|
||
replies = post.get("reply_count", 0)
|
||
|
||
# Text pattern checks
|
||
text_flags = _check_text_patterns(text)
|
||
if text_flags:
|
||
score -= 0.15 * len(text_flags)
|
||
flags.extend(text_flags)
|
||
|
||
# Handle heuristics
|
||
# Randomly generated handles (long hex/number strings)
|
||
if re.match(r"^[a-f0-9]{8,}\.", handle):
|
||
flags.append(f"random_handle: {handle}")
|
||
score -= 0.3
|
||
|
||
# No display name set
|
||
if not display_name or display_name == handle:
|
||
flags.append("no_display_name")
|
||
score -= 0.1
|
||
|
||
# Engagement ratio
|
||
eng_score, eng_flags = _engagement_ratio_score(likes, reposts, replies)
|
||
flags.extend(eng_flags)
|
||
score = score * 0.6 + eng_score * 0.4
|
||
|
||
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
|
||
|
||
|
||
def score_reddit_post(post: dict) -> CredibilityResult:
|
||
"""Score a Reddit post for credibility."""
|
||
score = 1.0
|
||
flags: list[str] = []
|
||
|
||
text = post.get("selftext", "") or post.get("title", "")
|
||
author = post.get("author", "")
|
||
upvote_ratio = post.get("upvote_ratio", 0.5)
|
||
post_score = post.get("score", 0)
|
||
num_comments = post.get("num_comments", 0)
|
||
|
||
# Text patterns
|
||
text_flags = _check_text_patterns(text)
|
||
if text_flags:
|
||
score -= 0.15 * len(text_flags)
|
||
flags.extend(text_flags)
|
||
|
||
# Deleted author
|
||
if author in ("[deleted]", "[removed]"):
|
||
flags.append("deleted_author")
|
||
score -= 0.2
|
||
|
||
# Suspicious username patterns (random alphanumeric + numbers)
|
||
if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author):
|
||
flags.append(f"auto_generated_username: {author}")
|
||
score -= 0.15
|
||
|
||
# Very controversial ratio (lots of up AND down votes)
|
||
if upvote_ratio < 0.4 and post_score > 0:
|
||
flags.append(f"highly_controversial: {upvote_ratio:.0%} upvote ratio")
|
||
score -= 0.1
|
||
|
||
# High score but zero comments = potential vote manipulation
|
||
if post_score > 100 and num_comments == 0:
|
||
flags.append(f"high_score_no_comments: {post_score} score, 0 comments")
|
||
score -= 0.2
|
||
|
||
# Low-effort cross-post spam: very short title, external link, no selftext
|
||
if (
|
||
len(post.get("title", "")) < 20
|
||
and not post.get("is_self", True)
|
||
and not post.get("selftext")
|
||
):
|
||
flags.append("possible_link_spam")
|
||
score -= 0.1
|
||
|
||
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
|
||
|
||
|
||
def score_reddit_comment(comment: dict) -> CredibilityResult:
|
||
"""Score a Reddit comment for credibility."""
|
||
score = 1.0
|
||
flags: list[str] = []
|
||
|
||
body = comment.get("body", "")
|
||
author = comment.get("author", "")
|
||
comment_score = comment.get("score", 0)
|
||
|
||
text_flags = _check_text_patterns(body)
|
||
if text_flags:
|
||
score -= 0.15 * len(text_flags)
|
||
flags.extend(text_flags)
|
||
|
||
if author in ("[deleted]", "[removed]"):
|
||
flags.append("deleted_author")
|
||
score -= 0.2
|
||
|
||
if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author):
|
||
flags.append(f"auto_generated_username: {author}")
|
||
score -= 0.15
|
||
|
||
# Heavily downvoted
|
||
if comment_score < -5:
|
||
flags.append(f"heavily_downvoted: {comment_score}")
|
||
score -= 0.15
|
||
|
||
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
|
||
|
||
|
||
def score_hackernews_post(post: dict) -> CredibilityResult:
|
||
"""Score a HN story for credibility.
|
||
|
||
HN is generally higher-signal than social media, but we still check
|
||
for low-effort submissions and spammy patterns.
|
||
"""
|
||
score = 1.0
|
||
flags: list[str] = []
|
||
|
||
title = post.get("title", "")
|
||
text = post.get("story_text", "") or title
|
||
points = post.get("points", 0)
|
||
num_comments = post.get("num_comments", 0)
|
||
|
||
text_flags = _check_text_patterns(text)
|
||
if text_flags:
|
||
score -= 0.1 * len(text_flags)
|
||
flags.extend(text_flags)
|
||
|
||
# Zero points = the community didn't find it valuable
|
||
if points == 0:
|
||
flags.append("zero_points")
|
||
score -= 0.1
|
||
|
||
# HN is generally more credible, start with a bonus
|
||
score = min(1.0, score + 0.1)
|
||
|
||
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
|
||
|
||
|
||
def score_hackernews_comment(comment: dict) -> CredibilityResult:
|
||
"""Score a HN comment for credibility."""
|
||
score = 1.0
|
||
flags: list[str] = []
|
||
|
||
text = comment.get("comment_text", "")
|
||
|
||
text_flags = _check_text_patterns(text)
|
||
if text_flags:
|
||
score -= 0.1 * len(text_flags)
|
||
flags.extend(text_flags)
|
||
|
||
# HN comments are generally higher quality
|
||
score = min(1.0, score + 0.1)
|
||
|
||
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
|
||
|
||
|
||
# --- Batch-level coordination detection ---
|
||
|
||
|
||
def detect_coordination(posts: list[dict], text_key: str = "text") -> list[str]:
|
||
"""Detect coordinated inauthentic behavior across a batch of posts.
|
||
|
||
Looks for:
|
||
- Duplicate or near-duplicate text (copy-paste campaigns)
|
||
- Burst posting (many posts in a very short window)
|
||
- Same talking points with minor variations
|
||
|
||
Returns a list of warning strings.
|
||
"""
|
||
warnings: list[str] = []
|
||
texts = [p.get(text_key, "") for p in posts if p.get(text_key)]
|
||
|
||
if not texts:
|
||
return warnings
|
||
|
||
# Exact duplicates
|
||
seen: dict[str, int] = {}
|
||
for t in texts:
|
||
normalized = t.strip().lower()
|
||
seen[normalized] = seen.get(normalized, 0) + 1
|
||
|
||
duplicates = {text: count for text, count in seen.items() if count > 1}
|
||
if duplicates:
|
||
total_dupes = sum(duplicates.values())
|
||
warnings.append(
|
||
f"COORDINATION WARNING: {len(duplicates)} duplicate texts found "
|
||
f"({total_dupes} total copies). Possible copy-paste campaign."
|
||
)
|
||
|
||
# Near-duplicates: check if many posts share a long common substring
|
||
# (simplified: check if >30% of posts start with the same 50+ chars)
|
||
if len(texts) >= 5:
|
||
prefixes: dict[str, int] = {}
|
||
for t in texts:
|
||
prefix = t.strip().lower()[:80]
|
||
if len(prefix) >= 50:
|
||
prefixes[prefix] = prefixes.get(prefix, 0) + 1
|
||
|
||
for prefix, count in prefixes.items():
|
||
if count >= len(texts) * 0.3:
|
||
warnings.append(
|
||
f"COORDINATION WARNING: {count}/{len(texts)} posts share "
|
||
f"a common prefix ({prefix[:50]}...). Possible template campaign."
|
||
)
|
||
|
||
# Burst detection: if timestamps are available
|
||
timestamps = []
|
||
for p in posts:
|
||
created = p.get("created_at") or p.get("created_utc")
|
||
if isinstance(created, str):
|
||
try:
|
||
timestamps.append(datetime.fromisoformat(created.replace("Z", "+00:00")))
|
||
except (ValueError, TypeError):
|
||
pass
|
||
elif isinstance(created, (int, float)):
|
||
timestamps.append(datetime.fromtimestamp(created, tz=timezone.utc))
|
||
|
||
if len(timestamps) >= 5:
|
||
timestamps.sort()
|
||
# Check if >50% of posts landed within a 5-minute window
|
||
window_seconds = 300
|
||
for i in range(len(timestamps) - 2):
|
||
window_end = timestamps[i] + __import__("datetime").timedelta(seconds=window_seconds)
|
||
in_window = sum(1 for t in timestamps if timestamps[i] <= t <= window_end)
|
||
if in_window >= len(timestamps) * 0.5:
|
||
warnings.append(
|
||
f"COORDINATION WARNING: {in_window}/{len(timestamps)} posts "
|
||
f"appeared within a 5-minute window. Possible coordinated posting."
|
||
)
|
||
break
|
||
|
||
return warnings
|
||
|
||
|
||
def filter_and_annotate(
|
||
posts: list[dict],
|
||
scorer,
|
||
min_score: float = 0.3,
|
||
flag_threshold: float = 0.5,
|
||
) -> tuple[list[dict], dict]:
|
||
"""Score all posts, filter out low-credibility ones, and annotate the rest.
|
||
|
||
Args:
|
||
posts: List of post dicts from any platform.
|
||
scorer: A scoring function (e.g., score_reddit_post).
|
||
min_score: Posts below this are excluded.
|
||
flag_threshold: Posts between min_score and this are flagged.
|
||
|
||
Returns:
|
||
Tuple of (filtered_posts, stats_dict).
|
||
Each post in filtered_posts gets a "_credibility" key added.
|
||
"""
|
||
filtered = []
|
||
stats = {
|
||
"total": len(posts),
|
||
"excluded": 0,
|
||
"flagged": 0,
|
||
"authentic": 0,
|
||
"excluded_reasons": [],
|
||
}
|
||
|
||
for post in posts:
|
||
result = scorer(post)
|
||
result.is_excluded = result.score < min_score
|
||
result.is_flagged = min_score <= result.score < flag_threshold
|
||
|
||
if result.is_excluded:
|
||
stats["excluded"] += 1
|
||
stats["excluded_reasons"].append(
|
||
{"score": round(result.score, 2), "flags": result.flags}
|
||
)
|
||
continue
|
||
|
||
post["_credibility"] = {
|
||
"score": round(result.score, 2),
|
||
"label": result.label,
|
||
"flags": result.flags,
|
||
"is_flagged": result.is_flagged,
|
||
}
|
||
|
||
if result.is_flagged:
|
||
stats["flagged"] += 1
|
||
else:
|
||
stats["authentic"] += 1
|
||
|
||
filtered.append(post)
|
||
|
||
return filtered, stats
|