Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
/plugin marketplace add plurigrid/asi/plugin install plurigrid-asi-skills@plurigrid/asiThis skill inherits all available tools. When active, it can use any tool Claude has access to.
Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
GF(3) Trit: +1 (Generator) — Produces data streams for downstream processing.
# Create session
curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession \
-H "Content-Type: application/json" \
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
# Response contains accessJwt and refreshJwt
export BSKY_TOKEN="eyJ..."
# Authorization endpoint: https://bsky.social/oauth/authorize
# Token endpoint: https://bsky.social/oauth/token
# Scopes: atproto, transition:generic
Get all posts from a user by handle or DID.
# Resolve handle to DID
curl "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=user.bsky.social"
# Get author feed (paginated)
curl "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
Pagination loop:
def fetch_all_posts(actor: str, token: str) -> list:
posts, cursor = [], None
while True:
url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
if cursor:
url += f"&cursor={cursor}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
data = resp.json()
posts.extend(data.get("feed", []))
cursor = data.get("cursor")
if not cursor:
break
time.sleep(0.5) # Rate limit respect
return posts
Map likes, reposts, replies, and quotes for a post.
# Get likes
curl "https://bsky.social/xrpc/app.bsky.feed.getLikes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Get reposts
curl "https://bsky.social/xrpc/app.bsky.feed.getRepostedBy?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Get quotes (search for post URI)
curl "https://bsky.social/xrpc/app.bsky.feed.getQuotes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
Engagement record schema:
CREATE TABLE engagement (
post_uri VARCHAR PRIMARY KEY,
author_did VARCHAR,
like_count INTEGER,
repost_count INTEGER,
reply_count INTEGER,
quote_count INTEGER,
likers VARCHAR[], -- Array of DIDs
reposters VARCHAR[],
indexed_at TIMESTAMP
);
Real-time monitoring via Firehose or polling.
# Firehose (WebSocket) - all network events
wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
# Polling fallback - notifications endpoint
curl "https://bsky.social/xrpc/app.bsky.notification.listNotifications?limit=50" \
-H "Authorization: Bearer $BSKY_TOKEN"
Firehose filter (Python):
import websocket
import cbor2
def on_message(ws, message):
header, body = cbor2.loads(message)
if header.get("op") == 1: # Commit
for op in body.get("ops", []):
if "app.bsky.feed.post" in op.get("path", ""):
record = op.get("record", {})
# Check for mentions in facets
for facet in record.get("facets", []):
for feature in facet.get("features", []):
if feature.get("$type") == "app.bsky.richtext.facet#mention":
if feature.get("did") == TARGET_DID:
yield record
ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
on_message=on_message)
Full conversation tree from a post.
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
-H "Authorization: Bearer $BSKY_TOKEN"
Thread tree structure:
def extract_thread_tree(thread_data: dict) -> dict:
"""Flatten thread to adjacency list for DuckDB."""
nodes = []
edges = []
def walk(node, parent_uri=None):
post = node.get("post", {})
uri = post.get("uri")
nodes.append({
"uri": uri,
"author_did": post.get("author", {}).get("did"),
"text": post.get("record", {}).get("text"),
"created_at": post.get("record", {}).get("createdAt"),
"depth": node.get("depth", 0)
})
if parent_uri:
edges.append({"parent": parent_uri, "child": uri})
for reply in node.get("replies", []):
walk(reply, uri)
walk(thread_data.get("thread", {}))
return {"nodes": nodes, "edges": edges}
First and second-order connections.
# Followers
curl "https://bsky.social/xrpc/app.bsky.graph.getFollowers?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Following
curl "https://bsky.social/xrpc/app.bsky.graph.getFollows?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
Second-order expansion:
def get_follower_network(actor: str, depth: int = 2) -> dict:
"""BFS expansion of follower graph."""
visited = set()
edges = []
queue = [(actor, 0)]
while queue:
current, d = queue.pop(0)
if current in visited or d > depth:
continue
visited.add(current)
followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
for f in followers:
did = f["did"]
edges.append({"from": did, "to": current, "type": "follows"})
if d < depth:
queue.append((did, d + 1))
return {"nodes": list(visited), "edges": edges}
| Endpoint Category | Rate Limit | Window |
|---|---|---|
| Read (feed, graph) | 3000/5min | Rolling |
| Write (post, like) | 1500/hr | Rolling |
| Firehose | Unlimited | - |
| Search | 100/min | Rolling |
Backoff strategy:
def rate_limited_request(url, headers, max_retries=5):
for attempt in range(max_retries):
resp = requests.get(url, headers=headers)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 2 ** attempt))
time.sleep(wait)
continue
return resp
raise Exception("Rate limit exceeded")
-- Posts table
CREATE TABLE bsky_posts (
uri VARCHAR PRIMARY KEY,
cid VARCHAR,
author_did VARCHAR,
text TEXT,
created_at TIMESTAMP,
reply_parent VARCHAR,
reply_root VARCHAR,
embed_type VARCHAR,
langs VARCHAR[],
labels VARCHAR[],
ingested_at TIMESTAMP DEFAULT now()
);
-- Social graph
CREATE TABLE bsky_graph (
subject_did VARCHAR,
object_did VARCHAR,
relation VARCHAR, -- 'follows', 'blocks', 'mutes'
created_at TIMESTAMP,
PRIMARY KEY (subject_did, object_did, relation)
);
-- Engagement events
CREATE TABLE bsky_engagement (
uri VARCHAR,
actor_did VARCHAR,
action VARCHAR, -- 'like', 'repost', 'quote', 'reply'
target_uri VARCHAR,
created_at TIMESTAMP,
PRIMARY KEY (uri)
);
-- Insert from JSON
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);
| Lexicon | Purpose |
|---|---|
app.bsky.feed.getAuthorFeed | User's posts |
app.bsky.feed.getPostThread | Thread tree |
app.bsky.feed.getLikes | Who liked |
app.bsky.feed.getRepostedBy | Who reposted |
app.bsky.graph.getFollowers | Follower list |
app.bsky.graph.getFollows | Following list |
app.bsky.actor.getProfile | User profile |
com.atproto.sync.subscribeRepos | Firehose |
Output feeds into:
# Pipeline handoff
posts = fetch_all_posts(actor, token)
duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])
# Signal next layer
publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})