167 lines
5.6 KiB
Python
167 lines
5.6 KiB
Python
"""Bluesky client using the AT Protocol API.
|
|
|
|
Search requires authentication. Set BLUESKY_HANDLE and BLUESKY_APP_PASSWORD
|
|
env vars. Create an app password at: https://bsky.app/settings/app-passwords
|
|
|
|
Thread fetching works without auth via the public API.
|
|
"""
|
|
|
|
import os
|
|
import httpx
|
|
|
|
BSKY_PUBLIC_API = "https://public.api.bsky.app"
|
|
BSKY_AUTH_API = "https://bsky.social"
|
|
|
|
|
|
async def _get_session() -> dict | None:
|
|
"""Authenticate with Bluesky and return session tokens, or None if no creds."""
|
|
handle = os.environ.get("BLUESKY_HANDLE")
|
|
app_password = os.environ.get("BLUESKY_APP_PASSWORD")
|
|
if not handle or not app_password:
|
|
return None
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
resp = await client.post(
|
|
f"{BSKY_AUTH_API}/xrpc/com.atproto.server.createSession",
|
|
json={"identifier": handle, "password": app_password},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def _format_post(post_view: dict) -> dict:
|
|
"""Extract relevant fields from an AT Protocol post view."""
|
|
post = post_view.get("post", post_view)
|
|
record = post.get("record", {})
|
|
author = post.get("author", {})
|
|
return {
|
|
"text": record.get("text", ""),
|
|
"author_handle": author.get("handle", ""),
|
|
"author_display_name": author.get("displayName", ""),
|
|
"created_at": record.get("createdAt", ""),
|
|
"like_count": post.get("likeCount", 0),
|
|
"repost_count": post.get("repostCount", 0),
|
|
"reply_count": post.get("replyCount", 0),
|
|
"uri": post.get("uri", ""),
|
|
"cid": post.get("cid", ""),
|
|
"url": _uri_to_url(post.get("uri", ""), author.get("handle", "")),
|
|
}
|
|
|
|
|
|
def _uri_to_url(uri: str, handle: str) -> str:
|
|
"""Convert an at:// URI to a bsky.app URL."""
|
|
# at://did:plc:xxx/app.bsky.feed.post/rkey -> https://bsky.app/profile/handle/post/rkey
|
|
if not uri.startswith("at://"):
|
|
return ""
|
|
parts = uri.split("/")
|
|
if len(parts) >= 5:
|
|
rkey = parts[-1]
|
|
return f"https://bsky.app/profile/{handle}/post/{rkey}"
|
|
return ""
|
|
|
|
|
|
async def search_posts(query: str, limit: int = 25, sort: str = "top") -> list[dict]:
|
|
"""Search Bluesky for posts matching a query.
|
|
|
|
Requires BLUESKY_HANDLE and BLUESKY_APP_PASSWORD env vars.
|
|
|
|
Args:
|
|
query: Search terms.
|
|
limit: Max results (capped at 100).
|
|
sort: "top" (most liked) or "latest" (chronological).
|
|
|
|
Returns:
|
|
List of post dicts with: text, author_handle, author_display_name,
|
|
created_at, like_count, repost_count, reply_count, uri, url.
|
|
|
|
Raises:
|
|
RuntimeError: If Bluesky credentials are not configured.
|
|
"""
|
|
session = await _get_session()
|
|
if not session:
|
|
raise RuntimeError(
|
|
"Bluesky search requires authentication. "
|
|
"Set BLUESKY_HANDLE and BLUESKY_APP_PASSWORD environment variables. "
|
|
"Create an app password at: https://bsky.app/settings/app-passwords"
|
|
)
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
resp = await client.get(
|
|
f"{BSKY_AUTH_API}/xrpc/app.bsky.feed.searchPosts",
|
|
params={
|
|
"q": query,
|
|
"limit": min(limit, 100),
|
|
"sort": sort,
|
|
},
|
|
headers={"Authorization": f"Bearer {session['accessJwt']}"},
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
return [_format_post(p) for p in data.get("posts", [])]
|
|
|
|
|
|
async def get_thread(uri: str, depth: int = 6) -> dict:
|
|
"""Fetch a Bluesky thread by AT URI or bsky.app URL.
|
|
|
|
Args:
|
|
uri: Either an at:// URI or a https://bsky.app/profile/.../post/... URL.
|
|
depth: How many levels of replies to fetch (max 1000).
|
|
|
|
Returns:
|
|
Dict with "post" (the root post) and "replies" (list of reply post dicts).
|
|
"""
|
|
# Convert bsky.app URL to AT URI if needed
|
|
if uri.startswith("https://bsky.app/"):
|
|
uri = await _resolve_url_to_uri(uri)
|
|
|
|
headers = {}
|
|
session = await _get_session()
|
|
if session:
|
|
headers["Authorization"] = f"Bearer {session['accessJwt']}"
|
|
|
|
async with httpx.AsyncClient(timeout=15) as client:
|
|
resp = await client.get(
|
|
f"{BSKY_PUBLIC_API}/xrpc/app.bsky.feed.getPostThread",
|
|
params={"uri": uri, "depth": min(depth, 1000)},
|
|
headers=headers,
|
|
)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
|
|
thread = data.get("thread", {})
|
|
root_post = _format_post(thread) if "post" in thread else {}
|
|
|
|
replies = []
|
|
for reply in thread.get("replies", []):
|
|
if "post" in reply:
|
|
replies.append(_format_post(reply))
|
|
# Include nested replies one level deep
|
|
for nested in reply.get("replies", []):
|
|
if "post" in nested:
|
|
replies.append(_format_post(nested))
|
|
|
|
return {"post": root_post, "replies": replies}
|
|
|
|
|
|
async def _resolve_url_to_uri(url: str) -> str:
|
|
"""Convert a bsky.app URL to an AT URI by resolving the handle."""
|
|
# https://bsky.app/profile/handle.bsky.social/post/rkey
|
|
parts = url.rstrip("/").split("/")
|
|
if len(parts) < 6:
|
|
raise ValueError(f"Invalid Bluesky URL: {url}")
|
|
|
|
handle = parts[4] # profile/{handle}
|
|
rkey = parts[6] # post/{rkey}
|
|
|
|
# Resolve handle to DID
|
|
async with httpx.AsyncClient(timeout=10) as client:
|
|
resp = await client.get(
|
|
f"{BSKY_PUBLIC_API}/xrpc/com.atproto.identity.resolveHandle",
|
|
params={"handle": handle},
|
|
)
|
|
resp.raise_for_status()
|
|
did = resp.json()["did"]
|
|
|
|
return f"at://{did}/app.bsky.feed.post/{rkey}"
|