From d0a08a5e3ffc7615eba7bd4663e0a8b22a13accc Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Thu, 25 Sep 2025 20:53:15 +0200 Subject: [PATCH 1/2] fix bluesky proc in api --- src/micro_blog/api.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/micro_blog/api.clj b/src/micro_blog/api.clj index b200cb6..f28b078 100644 --- a/src/micro_blog/api.clj +++ b/src/micro_blog/api.clj @@ -17,7 +17,7 @@ (defn blue-sky-proc-handler [_request] (let [msg "Procing BlueSky Scrape"] (tel/log! :info msg) - (micro-blog.mastodon/run) + (micro-blog.blue-sky/run) {:status 200 :body msg})) From a165321a8e217242d31c92e747d334e31e104ec6 Mon Sep 17 00:00:00 2001 From: Travis Shears Date: Mon, 1 Dec 2025 17:37:07 +0100 Subject: [PATCH 2/2] remove old node code and try to fix bluesky bug --- README.md | 15 ++- build.sh | 11 +- old_node_src/bluesky.ts | 155 ----------------------- old_node_src/mastodon.ts | 103 --------------- old_node_src/nostr.ts | 106 ---------------- old_node_src/pixelfed.ts | 107 ---------------- old_node_src/pocketbase.ts | 223 --------------------------------- src/micro_blog/blue_sky.clj | 2 +- src/micro_blog/pocket_base.clj | 20 +-- 9 files changed, 36 insertions(+), 706 deletions(-) delete mode 100644 old_node_src/bluesky.ts delete mode 100644 old_node_src/mastodon.ts delete mode 100644 old_node_src/nostr.ts delete mode 100644 old_node_src/pixelfed.ts delete mode 100644 old_node_src/pocketbase.ts diff --git a/README.md b/README.md index d523f33..ee89b7d 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,21 @@ instance. - [x] BlueSky - [ ] Pixelfed - [x] Mastodon -- [ ] Nostr +- [x] Nostr +## Dev + +Run the app locally: + +```shell +$ clj -M -m micro-blog.main +``` + +or just run parts via repl: + +```shell +$ clj +``` ## Deployment diff --git a/build.sh b/build.sh index a5f2a26..6330d68 100755 --- a/build.sh +++ b/build.sh @@ -3,7 +3,14 @@ set -e export AWS_PROFILE=personal +export AWS_REGION=eu-central-1 -docker buildx build --platform linux/amd64,linux/arm64 -t 853019563312.dkr.ecr.eu-central-1.amazonaws.com/micro-blog-fetchers-homelabstack:latest --push . +REPO_NAME="micro-blog-fetchers-homelabstack" -echo "Image pushed to ECR" +if ! aws ecr describe-repositories --repository-names "$REPO_NAME" >/dev/null 2>&1; then + aws ecr create-repository --repository-name "$REPO_NAME" +fi + +docker buildx build --platform linux/amd64,linux/arm64 -t "853019563312.dkr.ecr.eu-central-1.amazonaws.com/$REPO_NAME:latest" --push . + +echo "Docker image built and pushed to AWS ECR" diff --git a/old_node_src/bluesky.ts b/old_node_src/bluesky.ts deleted file mode 100644 index 98e62a3..0000000 --- a/old_node_src/bluesky.ts +++ /dev/null @@ -1,155 +0,0 @@ -import pino from "pino"; -import { MicroBlogBackend } from "./pocketbase"; - -// logger -const logger = pino(); - -// pocketbase -const pb = new MicroBlogBackend(logger); - -type Session = { - did: string; - accessJwt: string; - refreshJwt: string; -}; - -type BlueSkyPost = { - cid: string; - embed?: { - images: { - fullsize: string; - alt: string; - }[]; - }; - record: { - createdAt: string; // '2024-06-25T05:32:06.269Z', - // embed: { '$type': 'app.bsky.embed.images', images: [Array] }, - // facets: [ [Object] ], - text: string; - facets: { - features: { - $type: string; //"app.bsky.richtext.facet#tag",\n' + - tag?: string; // "cooking"\n' + - }[]; - }[]; - }; -}; - -const createSession = async (): Promise => { - const identifier = process.env.BLUE_SKY_USERNAME; - const apiKey = process.env.BLUE_SKY_API_KEY; - const body = JSON.stringify({ identifier, password: apiKey }); - const url = "https://bsky.social/xrpc/com.atproto.server.createSession"; - const res = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: body, - }); - const data = (await res.json()) as Session; - return data; -}; - -const limit = 10; -const getPostsUntilID = async ( - session: Session, - id: string, - cursor: string | null = null, - oldFeed: BlueSkyPost[] = [], -): Promise => { - const params = new URLSearchParams(); - params.append("actor", session.did); - params.append("limit", limit.toString()); - if (cursor) { - params.append("cursor", cursor); - } - - const urlWithParams = new URL( - "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed", - ); - urlWithParams.search = params.toString(); - - const res = await fetch(urlWithParams, { - headers: { - Accept: "application/json", - Authorization: `Bearer ${session.accessJwt}`, - }, - }); - const rawData = (await res.json()) as { - feed: { post: BlueSkyPost }[]; - cursor: string | null; - }; - const rawFeed = rawData.feed; - const feed = rawFeed.map((item) => item.post); - cursor = rawData?.cursor; - const filteredFeed = []; - for (const post of feed) { - if (post.cid === id) { - break; - } - filteredFeed.push(post); - } - - // the post id we are searching until is in the res so return - if (filteredFeed.length !== feed.length) { - return filteredFeed.concat(oldFeed); - } - // there are more posts to add before the id - if (feed.length === limit) { - return getPostsUntilID(session, id, cursor, oldFeed.concat(feed)); - } - - // the id was not found in the feed, return everything - return oldFeed.concat(feed); -}; - -const savePost = async (post: BlueSkyPost) => { - const postData = { - remoteId: post.cid, - posted: post.record.createdAt, - source: "blue_sky" as const, - fullPost: post, - authorId: "travisshears.bsky.social", - }; - return await pb.savePost(postData); -}; - -const saveTags = async (post: BlueSkyPost, postId: string) => { - for (const facet of post.record.facets) { - for (const feature of facet.features) { - if (feature.$type === "app.bsky.richtext.facet#tag") { - const tag = feature.tag; - if (tag) { - await pb.setTag(tag, postId); - } - } - } - } -}; - -const saveImages = async (post: BlueSkyPost, postId: string) => { - const images = post.embed?.images ?? []; - for (const image of images) { - await pb.saveAndSetImage( - { remoteURL: image.fullsize, alt: image.alt }, - postId, - ); - } -}; - -(async () => { - const session = await createSession(); - const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("blue_sky"); - const posts = await getPostsUntilID(session, lastSavedPostId ?? ""); - posts.reverse(); // save the oldest post first so if we fail posts are not lost on the next run - if (posts.length === 0) { - logger.info("No new posts to save"); - } - for (const post of posts) { - logger.info({ post }, "saving post"); - const savedNewPost = await savePost(post); - await saveTags(post, savedNewPost.id); - await saveImages(post, savedNewPost.id); - } -})(); diff --git a/old_node_src/mastodon.ts b/old_node_src/mastodon.ts deleted file mode 100644 index ef14cc6..0000000 --- a/old_node_src/mastodon.ts +++ /dev/null @@ -1,103 +0,0 @@ -import pino from "pino"; -import { MicroBlogBackend } from "./pocketbase"; - -// custom destination formatter -const logger = pino(); - -const pb = new MicroBlogBackend(logger); - -const baseURL = process.env.MASTODON_BASE_URL; -const accountId = process.env.MASTODON_ACCOUNT_ID; - -type MastodonPost = { - media_attachments: { - type: string; //'image', - url: string; - description: string; // 'Blurry gate', - }[]; - id: string; - content: string; - account: { - id: string; - }; - created_at: string; - tags: { name: string }[]; -}; - -const getPostUntilId = async ({ - lastSavedId, - maxId, - carryPosts = [], -}: { - lastSavedId?: string; - maxId?: string; - carryPosts?: MastodonPost[]; -}): Promise => { - const params = new URLSearchParams(); - params.append("limit", "10"); - const urlWithParams = new URL( - `${baseURL}/api/v1/accounts/${accountId}/statuses`, - ); - urlWithParams.search = params.toString(); - - const res = await fetch(urlWithParams); - const posts = (await res.json()) as MastodonPost[]; - const containsId = posts.some((post) => post.id === lastSavedId); - - if (!containsId && posts.length >= 5) { - return getPostUntilId({ - lastSavedId, - carryPosts: carryPosts?.concat(posts), - maxId: posts[posts.length - 1]?.id, - }); - } - - const allPosts = carryPosts?.concat(posts).reverse(); - if (lastSavedId) { - const index = allPosts.findIndex((post) => post.id === lastSavedId); - return allPosts.slice(index + 1); - } - return allPosts; -}; - -const savePost = async (post: MastodonPost) => { - const postData = { - remoteId: post.id, - authorId: post.account.id, - posted: post.created_at, - source: "mastodon" as const, - fullPost: post, - }; - return await pb.savePost(postData); -}; - -const saveTags = async (post: MastodonPost, postId: string) => { - logger.info({ tags: post.tags }, "saving tags"); - for (const tag of post.tags) { - await pb.setTag(tag.name, postId); - } -}; - -const saveImages = async (post: MastodonPost, postId: string) => { - logger.info({ images: post.media_attachments }, "saving images"); - for (const image of post.media_attachments) { - await pb.saveAndSetImage( - { remoteURL: image.url, alt: image.description }, - postId, - ); - } -}; - -(async () => { - const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("mastodon"); - console.log({ lastSavedPostId }); - const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); - console.log({ posts }); - - for (const post of posts) { - logger.info({ post }, "saving post"); - const savedNewPost = await savePost(post); - await saveTags(post, savedNewPost.id); - await saveImages(post, savedNewPost.id); - } -})(); diff --git a/old_node_src/nostr.ts b/old_node_src/nostr.ts deleted file mode 100644 index fea4ec3..0000000 --- a/old_node_src/nostr.ts +++ /dev/null @@ -1,106 +0,0 @@ -import pino from "pino"; -import WebSocket from "ws"; -import { MicroBlogBackend } from "./pocketbase"; - -const logger = pino(); -const pb = new MicroBlogBackend(logger); - -const fetcherNpub = process.env.NOSTR_FETCHER_NPUB; -const myNpub = process.env.NOSTR_ID; - -if (!fetcherNpub) { - throw new Error("NOSTR_FETCHER_NPUB is not set"); -} - -if (!myNpub) { - throw new Error("NOSTR_ID is not set"); -} - -type NostrTag = ["t" | "r" | "imeta", string]; -type NostrEvent = [ - "EVENT" | "EOSE", - string, - { - id: string; - pubkey: string; - created_at: number; - kind: number; // 1 - tags: NostrTag[]; - content: string; - sig: string; - }, -]; - -(async () => { - logger.info("Starting Nostr Fetcher"); - - // figure out when the last post of wasved - const lastSavedPost = await pb.getLatestPostBySource("nostr"); - if (!lastSavedPost) { - throw new Error("No last saved nostr post found"); - } - let since: number | undefined; - since = new Date(lastSavedPost.posted).getTime() / 1000; - logger.info( - { lastSavedPostId: lastSavedPost.id, since }, - "lastSavedPost nostr post", - ); - // listen for new events for 30 seconds - logger.info("trying to connecting to nostr relay"); - const relay = process.env.NOSTR_RELAY; - if (!relay) { - throw new Error("No NOSTR_RELAY environment variable found"); - } - - const events: NostrEvent[] = []; - const ws = new WebSocket(relay); - ws.on("error", logger.error); - ws.on("message", function message(data: Buffer) { - const decodedData = JSON.parse( - Buffer.from(data).toString("utf8"), - ) as NostrEvent; - logger.info({ decodedData }, "recived a message from nostr relay"); - if (decodedData[0] === "EVENT") { - events.push(decodedData); - } - }); - ws.on("open", function open() { - logger.info("connection established"); - ws.send( - JSON.stringify([ - "REQ", - fetcherNpub, - { kinds: [1], authors: [myNpub], ...(since ? { since } : {}) }, - ]), - ); - }); - await new Promise((resolve) => setTimeout(resolve, 30000)); - logger.info("closing connection to nostr relay"); - ws.close(); - logger.info({ count: events.length }, "saving nostr posts"); - for (const event of events) { - const post = await pb.savePost({ - remoteId: event[2].id, - fullPost: event[2], - posted: new Date(event[2].created_at * 1000).toISOString(), - source: "nostr", - authorId: event[1], - }); - - for (const tag of event[2].tags) { - if (tag[0] === "t") { - await pb.setTag(tag[1], post.id); - } else if (tag[0] === "imeta") { - const value = tag[1]; - // remove "url " from the start of the string - const url = value.slice(4); - await pb.saveAndSetImage( - { - remoteURL: url, - }, - post.id, - ); - } - } - } -})(); diff --git a/old_node_src/pixelfed.ts b/old_node_src/pixelfed.ts deleted file mode 100644 index a2ce146..0000000 --- a/old_node_src/pixelfed.ts +++ /dev/null @@ -1,107 +0,0 @@ -import pino from "pino"; -import { lambdaRequestTracker, pinoLambdaDestination } from "pino-lambda"; -import { MicroBlogBackend } from "./pocketbase"; - -// custom destination formatter -const destination = pinoLambdaDestination(); -const logger = pino({}, destination); -const withRequest = lambdaRequestTracker(); - -const baseURL = process.env.PIXELFED_BASE_URL; -const accountID = process.env.PIXELFED_ACCOUNT_ID; - -const pb = new MicroBlogBackend(logger); - -type PixelFedPost = { - media_attachments: { - type: string; //'image', - url: string; // 'https://gram.social/storage/m/_v2/703621281309160235/530d83cd3-f15549/FR41GdSiUQY0/bNHMrzuQkuhXKKfR1zG4HHcjFTe6G2YF02SOr2zi.jpg', - description: string; // 'Blurry gate', - }[]; - id: string; - content: string; - account: { - id: string; - }; - created_at: string; - tags: { name: string }[]; -}; - -const getPostUntilId = async ({ - lastSavedId, - maxId, - carryPosts = [], -}: { - lastSavedId?: string; - maxId?: string; - carryPosts?: PixelFedPost[]; -}): Promise => { - const params = new URLSearchParams(); - params.append("limit", "5"); - params.append("only_media", "true"); - if (maxId) { - params.append("max_id", maxId); - } - - const urlWithParams = new URL(`${baseURL}${accountID}/statuses`); - urlWithParams.search = params.toString(); - - const res = await fetch(urlWithParams.toString()); - const posts = (await res.json()) as PixelFedPost[]; - const containsId = posts.some((post) => post.id === lastSavedId); - - if (!containsId && posts.length >= 5) { - return getPostUntilId({ - lastSavedId, - carryPosts: carryPosts?.concat(posts), - maxId: posts[posts.length - 1]?.id, - }); - } - - const allPosts = carryPosts?.concat(posts).reverse(); - if (lastSavedId) { - const index = allPosts.findIndex((post) => post.id === lastSavedId); - return allPosts.slice(index + 1); - } - return allPosts; -}; - -const savePost = async (post: PixelFedPost) => { - const postData = { - remoteId: post.id, - authorId: post.account.id, - posted: post.created_at, - source: "pixelfed" as const, - fullPost: post, - }; - return await pb.savePost(postData); -}; - -const saveTags = async (post: PixelFedPost, postId: string) => { - logger.info({ tags: post.tags }, "saving tags"); - for (const tag of post.tags) { - await pb.setTag(tag.name, postId); - } -}; - -const saveImages = async (post: PixelFedPost, postId: string) => { - logger.info({ images: post.media_attachments }, "saving images"); - for (const image of post.media_attachments) { - await pb.saveAndSetImage( - { remoteURL: image.url, alt: image.description }, - postId - ); - } -}; - -exports.run = async (event: any, context: any) => { - withRequest(event, context); - const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("pixelfed"); - const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); - for (const post of posts) { - logger.info({ post }, "saving post"); - const savedNewPost = await savePost(post); - await saveTags(post, savedNewPost.id); - await saveImages(post, savedNewPost.id); - } -}; diff --git a/old_node_src/pocketbase.ts b/old_node_src/pocketbase.ts deleted file mode 100644 index fed568a..0000000 --- a/old_node_src/pocketbase.ts +++ /dev/null @@ -1,223 +0,0 @@ -import type { Logger } from "pino"; -import PocketBase from "pocketbase"; - -export type MicroBlogPostImage = { - id: string; - collectionId: string; - image: string; - alt?: string; - remoteURL: string; -}; - -export type MicroBlogPostTag = { - id: string; - tag: string; -}; - -export type MicroBlogPostSource = - | "blue_sky" - | "mastodon" - | "pleroma" - | "pixelfed" - | "nostr"; - -export type MicroBlogPost = { - source: MicroBlogPostSource; - fullPost: any; - remoteId: string; - authorId: string; - id: string; - posted: string; - expand: { - images?: MicroBlogPostImage[]; - tags?: { - id: string; - }[]; - }; -}; - -export class MicroBlogBackend { - private pb: PocketBase; - private clientSetTime?: Date; - constructor(private logger: Logger) { - this.pb = new PocketBase(process.env.POCKET_BASE_HOST); - } - - private async login() { - const pw = process.env.POCKET_BASE_PW; - const userName = process.env.POCKET_BASE_USER; - if (!pw) { - this.logger.error("POCKET_BASE_PW env var not set"); - throw new Error("POCKET_BASE_PW env var not set"); - } - if (!userName) { - this.logger.error("POCKET_BASE_USER env var not set"); - throw new Error("POCKET_BASE_USER env var not set"); - } - this.logger.info({ userName }, "Logging in to pocketbase"); - await this.pb.collection("users").authWithPassword(userName, pw); - this.clientSetTime = new Date(); - } - - private async checkLogin() { - if (!this.clientSetTime) { - await this.login(); - return; - } - const now = new Date(); - const diff = now.getTime() - this.clientSetTime.getTime(); - const day = 86_400_000; - - if (diff > day) { - await this.login(); - return; - } - } - - public async getLatestPostBySource( - postSource: MicroBlogPostSource, - ): Promise { - await this.checkLogin(); - try { - const post = await this.pb - .collection("micro_blog_posts") - .getFirstListItem(`source = '${postSource}'`, { sort: "-posted" }); - return post; - } catch (error: any) { - if (error.status === 404) { - return undefined; - } - throw error; - } - } - - public async getLatestPostRemoteIDBySource(postSource: MicroBlogPostSource) { - await this.checkLogin(); - const post = await this.getLatestPostBySource(postSource); - return post?.remoteId; - } - - async getTag(tag: string): Promise { - await this.checkLogin(); - try { - const remoteTag = await this.pb - .collection("micro_blog_tags") - .getFirstListItem(`tag = '${tag}'`); - return remoteTag; - } catch (e: any) { - if (e.status === 404) { - return undefined; - } - throw e; - } - } - - async getImageByRemoteURL( - remoteURL: string, - ): Promise { - await this.checkLogin(); - try { - const remoteImage = await this.pb - .collection("micro_blog_images") - .getFirstListItem(`remoteURL = '${remoteURL}'`); - return remoteImage; - } catch (e: any) { - if (e.status === 404) { - return undefined; - } - throw e; - } - } - - private async checkForPost( - remoteId: string, - ): Promise { - await this.checkLogin(); - try { - return await this.pb - .collection("micro_blog_posts") - .getFirstListItem(`remoteId = '${remoteId}'`); - } catch (e: any) { - if (e.status === 404) { - return undefined; - } - throw e; - } - } - - public async savePost( - post: Omit, - ): Promise { - await this.checkLogin(); - const existingPost = await this.checkForPost(post.remoteId); - if (!existingPost) { - return await this.pb - .collection("micro_blog_posts") - .create(post); - } - this.logger.info({ existingPost }, "Found existing post"); - return existingPost; - } - - public async setTag(rawTag: string, postId: string) { - await this.checkLogin(); - let tag = await this.getTag(rawTag); - if (!tag) { - tag = await this.pb - .collection("micro_blog_tags") - .create({ tag: rawTag }); - } - if (!tag) { - throw new Error("Failed to create tag"); - } - await this.pb.collection("micro_blog_posts").update(postId, { - "tags+": tag.id, - }); - } - - public async saveAndSetImage( - imageToSave: Omit, - postId: string, - ) { - await this.checkLogin(); - let image = await this.getImageByRemoteURL(imageToSave.remoteURL); - if (!image) { - const imageResponse = await fetch(imageToSave.remoteURL); - if (!imageResponse.ok) { - throw new Error("Failed to download image"); - } - const imageBlob = await imageResponse.blob(); - const imageFile = new File([imageBlob], "image.jpg", { - type: imageBlob.type, - }); - const data = { - ...imageToSave, - image: imageFile, - }; - image = await this.pb - .collection("micro_blog_images") - .create(data); - this.logger.info({ image }, "Created image"); - } - if (!image) { - throw new Error("Failed to create image"); - } - const res = await this.pb.collection("micro_blog_posts").update(postId, { - "images+": image.id, - }); - this.logger.info({ res }, "Updated post with image"); - } - - public async getPosts(page: number, limit = 20) { - await this.checkLogin(); - const resultList = await this.pb - .collection("micro_blog_posts") - .getList(page, limit, { - sort: "-posted", - expand: "images,tags", - filter: `(source = "blue_sky" || source = "pleroma")`, - // filter: 'source = "pleroma"', - }); - return resultList; - } -} diff --git a/src/micro_blog/blue_sky.clj b/src/micro_blog/blue_sky.clj index d491dac..08ff276 100644 --- a/src/micro_blog/blue_sky.clj +++ b/src/micro_blog/blue_sky.clj @@ -47,7 +47,7 @@ (defn get-posts-until-id ([session id] (get-posts-until-id session id nil [])) ([session id cursor prev-posts] - (tel/log! {:level :info :data {:postId (:remoteId id)}} "Getting posts until id") + (tel/log! {:level :info :data {:id id, :cursor cursor, :prev-posts (count prev-posts)}} "Getting posts until id") (let [limit 5 body (-> (http-client/get (str (@config :blue-sky-host) "/app.bsky.feed.getAuthorFeed") diff --git a/src/micro_blog/pocket_base.clj b/src/micro_blog/pocket_base.clj index 515ac85..ec4bd3b 100644 --- a/src/micro_blog/pocket_base.clj +++ b/src/micro_blog/pocket_base.clj @@ -190,17 +190,21 @@ (if (post-with-remote-id-already-saved? (:remoteId post)) (tel/log! {:level :warn :data {:remoteId (:remoteId post)}} "Post already saved, skipping") (try - (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") - {:headers {"Authorization" (get-login-token-with-cache)} - :form-params (assoc post - :tags (map get-tag-id (:tags post)) - :images (map #(apply get-image-id %) (:images post))) - :content-type :json - :as :json}) + (let [tag-ids (doall (map get-tag-id (:tags post))) + image-ids (doall (map #(apply get-image-id %) (:images post)))] + (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") + {:headers {"Authorization" (get-login-token-with-cache)} + :form-params (assoc post + :source (name (:source post)) + :isTech (name (:isTech post)) + :tags tag-ids + :images image-ids) + :content-type :json + :as :json})) (catch Exception e (tel/log! {:level :error :id :pocket-base/save-post-error - :data {:remoteId (:remoteId post) + :data {:post post :error (.getMessage e) :exception e}} "Error saving post to pocketbase")