Files
stegasoo/agentstuff/sentiment_agent/credibility.py
adlee-was-taken 4607ff27dd Minor fixes
2026-04-04 16:29:20 -04:00

399 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Credibility scoring and bot/disinfo detection.
Assigns a 0.01.0 credibility score to each post based on heuristic signals.
Posts below the configured threshold are excluded or flagged so they don't
pollute the sentiment analysis.
Signals are platform-aware — each platform has different indicators of
inauthentic behavior.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
@dataclass
class CredibilityResult:
"""Credibility assessment for a single post."""
score: float # 0.0 (likely bot/disinfo) to 1.0 (likely authentic)
flags: list[str] = field(default_factory=list) # human-readable reasons
is_excluded: bool = False # below min_credibility_score
is_flagged: bool = False # between min and flag threshold
@property
def label(self) -> str:
if self.score >= 0.7:
return "likely_authentic"
if self.score >= 0.5:
return "uncertain"
if self.score >= 0.3:
return "suspicious"
return "likely_inauthentic"
# --- Shared heuristics ---
# Common bot patterns in text
_BOT_TEXT_PATTERNS = [
# Crypto/scam spam
re.compile(r"(?i)(dm me|check my bio|link in bio|click here|free giveaway)"),
re.compile(r"(?i)(join my|subscribe to|follow me for|🔥.*🔥.*🔥)"),
# Astroturfing phrases
re.compile(r"(?i)(i (just )?(discovered|found|tried) this (amazing|incredible|awesome))"),
re.compile(r"(?i)(game.?changer|life.?changing|you won'?t believe)"),
# Excessive hashtags (5+)
re.compile(r"(#\w+\s*){5,}"),
# Walls of emojis (10+ consecutive)
re.compile(r"[\U0001F300-\U0001FAFF]{10,}"),
# Repetitive characters (spammy emphasis)
re.compile(r"(.)\1{9,}"),
]
# Coordinated campaign indicators: identical or near-identical text
# This is checked at the batch level, not per-post
def _check_text_patterns(text: str) -> list[str]:
"""Check text against common bot/spam patterns."""
flags = []
for pattern in _BOT_TEXT_PATTERNS:
if pattern.search(text):
flags.append(f"bot_text_pattern: {pattern.pattern[:60]}")
if len(text) < 15:
flags.append("very_short_text")
return flags
def _engagement_ratio_score(
likes: int, reposts: int, replies: int
) -> tuple[float, list[str]]:
"""Score based on engagement ratios.
Authentic posts tend to have a mix of likes, replies, and reposts.
Bot-amplified posts often have inflated likes with very few replies,
or massive repost counts with no discussion.
"""
flags = []
total = likes + reposts + replies
if total == 0:
return 0.5, ["no_engagement"]
# High repost-to-reply ratio suggests amplification without discussion
if reposts > 0 and replies == 0 and reposts > 10:
flags.append(f"high_repost_no_replies: {reposts} reposts, 0 replies")
return 0.3, flags
# Extremely high like count with zero replies is suspicious
if likes > 100 and replies == 0:
flags.append(f"high_likes_no_replies: {likes} likes, 0 replies")
return 0.4, flags
# Normal engagement
return min(1.0, 0.5 + (replies / max(total, 1)) * 0.5), flags
# --- Platform-specific scoring ---
def score_bluesky_post(post: dict) -> CredibilityResult:
"""Score a Bluesky post for credibility."""
score = 1.0
flags: list[str] = []
text = post.get("text", "")
handle = post.get("author_handle", "")
display_name = post.get("author_display_name", "")
likes = post.get("like_count", 0)
reposts = post.get("repost_count", 0)
replies = post.get("reply_count", 0)
# Text pattern checks
text_flags = _check_text_patterns(text)
if text_flags:
score -= 0.15 * len(text_flags)
flags.extend(text_flags)
# Handle heuristics
# Randomly generated handles (long hex/number strings)
if re.match(r"^[a-f0-9]{8,}\.", handle):
flags.append(f"random_handle: {handle}")
score -= 0.3
# No display name set
if not display_name or display_name == handle:
flags.append("no_display_name")
score -= 0.1
# Engagement ratio
eng_score, eng_flags = _engagement_ratio_score(likes, reposts, replies)
flags.extend(eng_flags)
score = score * 0.6 + eng_score * 0.4
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
def score_reddit_post(post: dict) -> CredibilityResult:
"""Score a Reddit post for credibility."""
score = 1.0
flags: list[str] = []
text = post.get("selftext", "") or post.get("title", "")
author = post.get("author", "")
upvote_ratio = post.get("upvote_ratio", 0.5)
post_score = post.get("score", 0)
num_comments = post.get("num_comments", 0)
# Text patterns
text_flags = _check_text_patterns(text)
if text_flags:
score -= 0.15 * len(text_flags)
flags.extend(text_flags)
# Deleted author
if author in ("[deleted]", "[removed]"):
flags.append("deleted_author")
score -= 0.2
# Suspicious username patterns (random alphanumeric + numbers)
if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author):
flags.append(f"auto_generated_username: {author}")
score -= 0.15
# Very controversial ratio (lots of up AND down votes)
if upvote_ratio < 0.4 and post_score > 0:
flags.append(f"highly_controversial: {upvote_ratio:.0%} upvote ratio")
score -= 0.1
# High score but zero comments = potential vote manipulation
if post_score > 100 and num_comments == 0:
flags.append(f"high_score_no_comments: {post_score} score, 0 comments")
score -= 0.2
# Low-effort cross-post spam: very short title, external link, no selftext
if (
len(post.get("title", "")) < 20
and not post.get("is_self", True)
and not post.get("selftext")
):
flags.append("possible_link_spam")
score -= 0.1
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
def score_reddit_comment(comment: dict) -> CredibilityResult:
"""Score a Reddit comment for credibility."""
score = 1.0
flags: list[str] = []
body = comment.get("body", "")
author = comment.get("author", "")
comment_score = comment.get("score", 0)
text_flags = _check_text_patterns(body)
if text_flags:
score -= 0.15 * len(text_flags)
flags.extend(text_flags)
if author in ("[deleted]", "[removed]"):
flags.append("deleted_author")
score -= 0.2
if re.match(r"^[A-Za-z]+[-_]?\d{4,}$", author):
flags.append(f"auto_generated_username: {author}")
score -= 0.15
# Heavily downvoted
if comment_score < -5:
flags.append(f"heavily_downvoted: {comment_score}")
score -= 0.15
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
def score_hackernews_post(post: dict) -> CredibilityResult:
"""Score a HN story for credibility.
HN is generally higher-signal than social media, but we still check
for low-effort submissions and spammy patterns.
"""
score = 1.0
flags: list[str] = []
title = post.get("title", "")
text = post.get("story_text", "") or title
points = post.get("points", 0)
num_comments = post.get("num_comments", 0)
text_flags = _check_text_patterns(text)
if text_flags:
score -= 0.1 * len(text_flags)
flags.extend(text_flags)
# Zero points = the community didn't find it valuable
if points == 0:
flags.append("zero_points")
score -= 0.1
# HN is generally more credible, start with a bonus
score = min(1.0, score + 0.1)
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
def score_hackernews_comment(comment: dict) -> CredibilityResult:
"""Score a HN comment for credibility."""
score = 1.0
flags: list[str] = []
text = comment.get("comment_text", "")
text_flags = _check_text_patterns(text)
if text_flags:
score -= 0.1 * len(text_flags)
flags.extend(text_flags)
# HN comments are generally higher quality
score = min(1.0, score + 0.1)
return CredibilityResult(score=max(0.0, min(1.0, score)), flags=flags)
# --- Batch-level coordination detection ---
def detect_coordination(posts: list[dict], text_key: str = "text") -> list[str]:
"""Detect coordinated inauthentic behavior across a batch of posts.
Looks for:
- Duplicate or near-duplicate text (copy-paste campaigns)
- Burst posting (many posts in a very short window)
- Same talking points with minor variations
Returns a list of warning strings.
"""
warnings: list[str] = []
texts = [p.get(text_key, "") for p in posts if p.get(text_key)]
if not texts:
return warnings
# Exact duplicates
seen: dict[str, int] = {}
for t in texts:
normalized = t.strip().lower()
seen[normalized] = seen.get(normalized, 0) + 1
duplicates = {text: count for text, count in seen.items() if count > 1}
if duplicates:
total_dupes = sum(duplicates.values())
warnings.append(
f"COORDINATION WARNING: {len(duplicates)} duplicate texts found "
f"({total_dupes} total copies). Possible copy-paste campaign."
)
# Near-duplicates: check if many posts share a long common substring
# (simplified: check if >30% of posts start with the same 50+ chars)
if len(texts) >= 5:
prefixes: dict[str, int] = {}
for t in texts:
prefix = t.strip().lower()[:80]
if len(prefix) >= 50:
prefixes[prefix] = prefixes.get(prefix, 0) + 1
for prefix, count in prefixes.items():
if count >= len(texts) * 0.3:
warnings.append(
f"COORDINATION WARNING: {count}/{len(texts)} posts share "
f"a common prefix ({prefix[:50]}...). Possible template campaign."
)
# Burst detection: if timestamps are available
timestamps = []
for p in posts:
created = p.get("created_at") or p.get("created_utc")
if isinstance(created, str):
try:
timestamps.append(datetime.fromisoformat(created.replace("Z", "+00:00")))
except (ValueError, TypeError):
pass
elif isinstance(created, (int, float)):
timestamps.append(datetime.fromtimestamp(created, tz=timezone.utc))
if len(timestamps) >= 5:
timestamps.sort()
# Check if >50% of posts landed within a 5-minute window
window_seconds = 300
for i in range(len(timestamps) - 2):
window_end = timestamps[i] + __import__("datetime").timedelta(seconds=window_seconds)
in_window = sum(1 for t in timestamps if timestamps[i] <= t <= window_end)
if in_window >= len(timestamps) * 0.5:
warnings.append(
f"COORDINATION WARNING: {in_window}/{len(timestamps)} posts "
f"appeared within a 5-minute window. Possible coordinated posting."
)
break
return warnings
def filter_and_annotate(
posts: list[dict],
scorer,
min_score: float = 0.3,
flag_threshold: float = 0.5,
) -> tuple[list[dict], dict]:
"""Score all posts, filter out low-credibility ones, and annotate the rest.
Args:
posts: List of post dicts from any platform.
scorer: A scoring function (e.g., score_reddit_post).
min_score: Posts below this are excluded.
flag_threshold: Posts between min_score and this are flagged.
Returns:
Tuple of (filtered_posts, stats_dict).
Each post in filtered_posts gets a "_credibility" key added.
"""
filtered = []
stats = {
"total": len(posts),
"excluded": 0,
"flagged": 0,
"authentic": 0,
"excluded_reasons": [],
}
for post in posts:
result = scorer(post)
result.is_excluded = result.score < min_score
result.is_flagged = min_score <= result.score < flag_threshold
if result.is_excluded:
stats["excluded"] += 1
stats["excluded_reasons"].append(
{"score": round(result.score, 2), "flags": result.flags}
)
continue
post["_credibility"] = {
"score": round(result.score, 2),
"label": result.label,
"flags": result.flags,
"is_flagged": result.is_flagged,
}
if result.is_flagged:
stats["flagged"] += 1
else:
stats["authentic"] += 1
filtered.append(post)
return filtered, stats