diff --git a/README.md b/README.md index ee89b7d..d523f33 100644 --- a/README.md +++ b/README.md @@ -12,21 +12,8 @@ instance. - [x] BlueSky - [ ] Pixelfed - [x] Mastodon -- [x] Nostr +- [ ] Nostr -## Dev - -Run the app locally: - -```shell -$ clj -M -m micro-blog.main -``` - -or just run parts via repl: - -```shell -$ clj -``` ## Deployment diff --git a/build.sh b/build.sh index 6330d68..a5f2a26 100755 --- a/build.sh +++ b/build.sh @@ -3,14 +3,7 @@ set -e export AWS_PROFILE=personal -export AWS_REGION=eu-central-1 -REPO_NAME="micro-blog-fetchers-homelabstack" +docker buildx build --platform linux/amd64,linux/arm64 -t 853019563312.dkr.ecr.eu-central-1.amazonaws.com/micro-blog-fetchers-homelabstack:latest --push . -if ! aws ecr describe-repositories --repository-names "$REPO_NAME" >/dev/null 2>&1; then - aws ecr create-repository --repository-name "$REPO_NAME" -fi - -docker buildx build --platform linux/amd64,linux/arm64 -t "853019563312.dkr.ecr.eu-central-1.amazonaws.com/$REPO_NAME:latest" --push . - -echo "Docker image built and pushed to AWS ECR" +echo "Image pushed to ECR" diff --git a/old_node_src/bluesky.ts b/old_node_src/bluesky.ts new file mode 100644 index 0000000..98e62a3 --- /dev/null +++ b/old_node_src/bluesky.ts @@ -0,0 +1,155 @@ +import pino from "pino"; +import { MicroBlogBackend } from "./pocketbase"; + +// logger +const logger = pino(); + +// pocketbase +const pb = new MicroBlogBackend(logger); + +type Session = { + did: string; + accessJwt: string; + refreshJwt: string; +}; + +type BlueSkyPost = { + cid: string; + embed?: { + images: { + fullsize: string; + alt: string; + }[]; + }; + record: { + createdAt: string; // '2024-06-25T05:32:06.269Z', + // embed: { '$type': 'app.bsky.embed.images', images: [Array] }, + // facets: [ [Object] ], + text: string; + facets: { + features: { + $type: string; //"app.bsky.richtext.facet#tag",\n' + + tag?: string; // "cooking"\n' + + }[]; + }[]; + }; +}; + +const createSession = async (): Promise => { + const identifier = process.env.BLUE_SKY_USERNAME; + const apiKey = process.env.BLUE_SKY_API_KEY; + const body = JSON.stringify({ identifier, password: apiKey }); + const url = "https://bsky.social/xrpc/com.atproto.server.createSession"; + const res = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: body, + }); + const data = (await res.json()) as Session; + return data; +}; + +const limit = 10; +const getPostsUntilID = async ( + session: Session, + id: string, + cursor: string | null = null, + oldFeed: BlueSkyPost[] = [], +): Promise => { + const params = new URLSearchParams(); + params.append("actor", session.did); + params.append("limit", limit.toString()); + if (cursor) { + params.append("cursor", cursor); + } + + const urlWithParams = new URL( + "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed", + ); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams, { + headers: { + Accept: "application/json", + Authorization: `Bearer ${session.accessJwt}`, + }, + }); + const rawData = (await res.json()) as { + feed: { post: BlueSkyPost }[]; + cursor: string | null; + }; + const rawFeed = rawData.feed; + const feed = rawFeed.map((item) => item.post); + cursor = rawData?.cursor; + const filteredFeed = []; + for (const post of feed) { + if (post.cid === id) { + break; + } + filteredFeed.push(post); + } + + // the post id we are searching until is in the res so return + if (filteredFeed.length !== feed.length) { + return filteredFeed.concat(oldFeed); + } + // there are more posts to add before the id + if (feed.length === limit) { + return getPostsUntilID(session, id, cursor, oldFeed.concat(feed)); + } + + // the id was not found in the feed, return everything + return oldFeed.concat(feed); +}; + +const savePost = async (post: BlueSkyPost) => { + const postData = { + remoteId: post.cid, + posted: post.record.createdAt, + source: "blue_sky" as const, + fullPost: post, + authorId: "travisshears.bsky.social", + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: BlueSkyPost, postId: string) => { + for (const facet of post.record.facets) { + for (const feature of facet.features) { + if (feature.$type === "app.bsky.richtext.facet#tag") { + const tag = feature.tag; + if (tag) { + await pb.setTag(tag, postId); + } + } + } + } +}; + +const saveImages = async (post: BlueSkyPost, postId: string) => { + const images = post.embed?.images ?? []; + for (const image of images) { + await pb.saveAndSetImage( + { remoteURL: image.fullsize, alt: image.alt }, + postId, + ); + } +}; + +(async () => { + const session = await createSession(); + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("blue_sky"); + const posts = await getPostsUntilID(session, lastSavedPostId ?? ""); + posts.reverse(); // save the oldest post first so if we fail posts are not lost on the next run + if (posts.length === 0) { + logger.info("No new posts to save"); + } + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +})(); diff --git a/old_node_src/mastodon.ts b/old_node_src/mastodon.ts new file mode 100644 index 0000000..ef14cc6 --- /dev/null +++ b/old_node_src/mastodon.ts @@ -0,0 +1,103 @@ +import pino from "pino"; +import { MicroBlogBackend } from "./pocketbase"; + +// custom destination formatter +const logger = pino(); + +const pb = new MicroBlogBackend(logger); + +const baseURL = process.env.MASTODON_BASE_URL; +const accountId = process.env.MASTODON_ACCOUNT_ID; + +type MastodonPost = { + media_attachments: { + type: string; //'image', + url: string; + description: string; // 'Blurry gate', + }[]; + id: string; + content: string; + account: { + id: string; + }; + created_at: string; + tags: { name: string }[]; +}; + +const getPostUntilId = async ({ + lastSavedId, + maxId, + carryPosts = [], +}: { + lastSavedId?: string; + maxId?: string; + carryPosts?: MastodonPost[]; +}): Promise => { + const params = new URLSearchParams(); + params.append("limit", "10"); + const urlWithParams = new URL( + `${baseURL}/api/v1/accounts/${accountId}/statuses`, + ); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams); + const posts = (await res.json()) as MastodonPost[]; + const containsId = posts.some((post) => post.id === lastSavedId); + + if (!containsId && posts.length >= 5) { + return getPostUntilId({ + lastSavedId, + carryPosts: carryPosts?.concat(posts), + maxId: posts[posts.length - 1]?.id, + }); + } + + const allPosts = carryPosts?.concat(posts).reverse(); + if (lastSavedId) { + const index = allPosts.findIndex((post) => post.id === lastSavedId); + return allPosts.slice(index + 1); + } + return allPosts; +}; + +const savePost = async (post: MastodonPost) => { + const postData = { + remoteId: post.id, + authorId: post.account.id, + posted: post.created_at, + source: "mastodon" as const, + fullPost: post, + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: MastodonPost, postId: string) => { + logger.info({ tags: post.tags }, "saving tags"); + for (const tag of post.tags) { + await pb.setTag(tag.name, postId); + } +}; + +const saveImages = async (post: MastodonPost, postId: string) => { + logger.info({ images: post.media_attachments }, "saving images"); + for (const image of post.media_attachments) { + await pb.saveAndSetImage( + { remoteURL: image.url, alt: image.description }, + postId, + ); + } +}; + +(async () => { + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("mastodon"); + console.log({ lastSavedPostId }); + const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); + console.log({ posts }); + + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +})(); diff --git a/old_node_src/nostr.ts b/old_node_src/nostr.ts new file mode 100644 index 0000000..fea4ec3 --- /dev/null +++ b/old_node_src/nostr.ts @@ -0,0 +1,106 @@ +import pino from "pino"; +import WebSocket from "ws"; +import { MicroBlogBackend } from "./pocketbase"; + +const logger = pino(); +const pb = new MicroBlogBackend(logger); + +const fetcherNpub = process.env.NOSTR_FETCHER_NPUB; +const myNpub = process.env.NOSTR_ID; + +if (!fetcherNpub) { + throw new Error("NOSTR_FETCHER_NPUB is not set"); +} + +if (!myNpub) { + throw new Error("NOSTR_ID is not set"); +} + +type NostrTag = ["t" | "r" | "imeta", string]; +type NostrEvent = [ + "EVENT" | "EOSE", + string, + { + id: string; + pubkey: string; + created_at: number; + kind: number; // 1 + tags: NostrTag[]; + content: string; + sig: string; + }, +]; + +(async () => { + logger.info("Starting Nostr Fetcher"); + + // figure out when the last post of wasved + const lastSavedPost = await pb.getLatestPostBySource("nostr"); + if (!lastSavedPost) { + throw new Error("No last saved nostr post found"); + } + let since: number | undefined; + since = new Date(lastSavedPost.posted).getTime() / 1000; + logger.info( + { lastSavedPostId: lastSavedPost.id, since }, + "lastSavedPost nostr post", + ); + // listen for new events for 30 seconds + logger.info("trying to connecting to nostr relay"); + const relay = process.env.NOSTR_RELAY; + if (!relay) { + throw new Error("No NOSTR_RELAY environment variable found"); + } + + const events: NostrEvent[] = []; + const ws = new WebSocket(relay); + ws.on("error", logger.error); + ws.on("message", function message(data: Buffer) { + const decodedData = JSON.parse( + Buffer.from(data).toString("utf8"), + ) as NostrEvent; + logger.info({ decodedData }, "recived a message from nostr relay"); + if (decodedData[0] === "EVENT") { + events.push(decodedData); + } + }); + ws.on("open", function open() { + logger.info("connection established"); + ws.send( + JSON.stringify([ + "REQ", + fetcherNpub, + { kinds: [1], authors: [myNpub], ...(since ? { since } : {}) }, + ]), + ); + }); + await new Promise((resolve) => setTimeout(resolve, 30000)); + logger.info("closing connection to nostr relay"); + ws.close(); + logger.info({ count: events.length }, "saving nostr posts"); + for (const event of events) { + const post = await pb.savePost({ + remoteId: event[2].id, + fullPost: event[2], + posted: new Date(event[2].created_at * 1000).toISOString(), + source: "nostr", + authorId: event[1], + }); + + for (const tag of event[2].tags) { + if (tag[0] === "t") { + await pb.setTag(tag[1], post.id); + } else if (tag[0] === "imeta") { + const value = tag[1]; + // remove "url " from the start of the string + const url = value.slice(4); + await pb.saveAndSetImage( + { + remoteURL: url, + }, + post.id, + ); + } + } + } +})(); diff --git a/old_node_src/pixelfed.ts b/old_node_src/pixelfed.ts new file mode 100644 index 0000000..a2ce146 --- /dev/null +++ b/old_node_src/pixelfed.ts @@ -0,0 +1,107 @@ +import pino from "pino"; +import { lambdaRequestTracker, pinoLambdaDestination } from "pino-lambda"; +import { MicroBlogBackend } from "./pocketbase"; + +// custom destination formatter +const destination = pinoLambdaDestination(); +const logger = pino({}, destination); +const withRequest = lambdaRequestTracker(); + +const baseURL = process.env.PIXELFED_BASE_URL; +const accountID = process.env.PIXELFED_ACCOUNT_ID; + +const pb = new MicroBlogBackend(logger); + +type PixelFedPost = { + media_attachments: { + type: string; //'image', + url: string; // 'https://gram.social/storage/m/_v2/703621281309160235/530d83cd3-f15549/FR41GdSiUQY0/bNHMrzuQkuhXKKfR1zG4HHcjFTe6G2YF02SOr2zi.jpg', + description: string; // 'Blurry gate', + }[]; + id: string; + content: string; + account: { + id: string; + }; + created_at: string; + tags: { name: string }[]; +}; + +const getPostUntilId = async ({ + lastSavedId, + maxId, + carryPosts = [], +}: { + lastSavedId?: string; + maxId?: string; + carryPosts?: PixelFedPost[]; +}): Promise => { + const params = new URLSearchParams(); + params.append("limit", "5"); + params.append("only_media", "true"); + if (maxId) { + params.append("max_id", maxId); + } + + const urlWithParams = new URL(`${baseURL}${accountID}/statuses`); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams.toString()); + const posts = (await res.json()) as PixelFedPost[]; + const containsId = posts.some((post) => post.id === lastSavedId); + + if (!containsId && posts.length >= 5) { + return getPostUntilId({ + lastSavedId, + carryPosts: carryPosts?.concat(posts), + maxId: posts[posts.length - 1]?.id, + }); + } + + const allPosts = carryPosts?.concat(posts).reverse(); + if (lastSavedId) { + const index = allPosts.findIndex((post) => post.id === lastSavedId); + return allPosts.slice(index + 1); + } + return allPosts; +}; + +const savePost = async (post: PixelFedPost) => { + const postData = { + remoteId: post.id, + authorId: post.account.id, + posted: post.created_at, + source: "pixelfed" as const, + fullPost: post, + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: PixelFedPost, postId: string) => { + logger.info({ tags: post.tags }, "saving tags"); + for (const tag of post.tags) { + await pb.setTag(tag.name, postId); + } +}; + +const saveImages = async (post: PixelFedPost, postId: string) => { + logger.info({ images: post.media_attachments }, "saving images"); + for (const image of post.media_attachments) { + await pb.saveAndSetImage( + { remoteURL: image.url, alt: image.description }, + postId + ); + } +}; + +exports.run = async (event: any, context: any) => { + withRequest(event, context); + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("pixelfed"); + const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +}; diff --git a/old_node_src/pocketbase.ts b/old_node_src/pocketbase.ts new file mode 100644 index 0000000..fed568a --- /dev/null +++ b/old_node_src/pocketbase.ts @@ -0,0 +1,223 @@ +import type { Logger } from "pino"; +import PocketBase from "pocketbase"; + +export type MicroBlogPostImage = { + id: string; + collectionId: string; + image: string; + alt?: string; + remoteURL: string; +}; + +export type MicroBlogPostTag = { + id: string; + tag: string; +}; + +export type MicroBlogPostSource = + | "blue_sky" + | "mastodon" + | "pleroma" + | "pixelfed" + | "nostr"; + +export type MicroBlogPost = { + source: MicroBlogPostSource; + fullPost: any; + remoteId: string; + authorId: string; + id: string; + posted: string; + expand: { + images?: MicroBlogPostImage[]; + tags?: { + id: string; + }[]; + }; +}; + +export class MicroBlogBackend { + private pb: PocketBase; + private clientSetTime?: Date; + constructor(private logger: Logger) { + this.pb = new PocketBase(process.env.POCKET_BASE_HOST); + } + + private async login() { + const pw = process.env.POCKET_BASE_PW; + const userName = process.env.POCKET_BASE_USER; + if (!pw) { + this.logger.error("POCKET_BASE_PW env var not set"); + throw new Error("POCKET_BASE_PW env var not set"); + } + if (!userName) { + this.logger.error("POCKET_BASE_USER env var not set"); + throw new Error("POCKET_BASE_USER env var not set"); + } + this.logger.info({ userName }, "Logging in to pocketbase"); + await this.pb.collection("users").authWithPassword(userName, pw); + this.clientSetTime = new Date(); + } + + private async checkLogin() { + if (!this.clientSetTime) { + await this.login(); + return; + } + const now = new Date(); + const diff = now.getTime() - this.clientSetTime.getTime(); + const day = 86_400_000; + + if (diff > day) { + await this.login(); + return; + } + } + + public async getLatestPostBySource( + postSource: MicroBlogPostSource, + ): Promise { + await this.checkLogin(); + try { + const post = await this.pb + .collection("micro_blog_posts") + .getFirstListItem(`source = '${postSource}'`, { sort: "-posted" }); + return post; + } catch (error: any) { + if (error.status === 404) { + return undefined; + } + throw error; + } + } + + public async getLatestPostRemoteIDBySource(postSource: MicroBlogPostSource) { + await this.checkLogin(); + const post = await this.getLatestPostBySource(postSource); + return post?.remoteId; + } + + async getTag(tag: string): Promise { + await this.checkLogin(); + try { + const remoteTag = await this.pb + .collection("micro_blog_tags") + .getFirstListItem(`tag = '${tag}'`); + return remoteTag; + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + async getImageByRemoteURL( + remoteURL: string, + ): Promise { + await this.checkLogin(); + try { + const remoteImage = await this.pb + .collection("micro_blog_images") + .getFirstListItem(`remoteURL = '${remoteURL}'`); + return remoteImage; + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + private async checkForPost( + remoteId: string, + ): Promise { + await this.checkLogin(); + try { + return await this.pb + .collection("micro_blog_posts") + .getFirstListItem(`remoteId = '${remoteId}'`); + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + public async savePost( + post: Omit, + ): Promise { + await this.checkLogin(); + const existingPost = await this.checkForPost(post.remoteId); + if (!existingPost) { + return await this.pb + .collection("micro_blog_posts") + .create(post); + } + this.logger.info({ existingPost }, "Found existing post"); + return existingPost; + } + + public async setTag(rawTag: string, postId: string) { + await this.checkLogin(); + let tag = await this.getTag(rawTag); + if (!tag) { + tag = await this.pb + .collection("micro_blog_tags") + .create({ tag: rawTag }); + } + if (!tag) { + throw new Error("Failed to create tag"); + } + await this.pb.collection("micro_blog_posts").update(postId, { + "tags+": tag.id, + }); + } + + public async saveAndSetImage( + imageToSave: Omit, + postId: string, + ) { + await this.checkLogin(); + let image = await this.getImageByRemoteURL(imageToSave.remoteURL); + if (!image) { + const imageResponse = await fetch(imageToSave.remoteURL); + if (!imageResponse.ok) { + throw new Error("Failed to download image"); + } + const imageBlob = await imageResponse.blob(); + const imageFile = new File([imageBlob], "image.jpg", { + type: imageBlob.type, + }); + const data = { + ...imageToSave, + image: imageFile, + }; + image = await this.pb + .collection("micro_blog_images") + .create(data); + this.logger.info({ image }, "Created image"); + } + if (!image) { + throw new Error("Failed to create image"); + } + const res = await this.pb.collection("micro_blog_posts").update(postId, { + "images+": image.id, + }); + this.logger.info({ res }, "Updated post with image"); + } + + public async getPosts(page: number, limit = 20) { + await this.checkLogin(); + const resultList = await this.pb + .collection("micro_blog_posts") + .getList(page, limit, { + sort: "-posted", + expand: "images,tags", + filter: `(source = "blue_sky" || source = "pleroma")`, + // filter: 'source = "pleroma"', + }); + return resultList; + } +} diff --git a/src/micro_blog/api.clj b/src/micro_blog/api.clj index f28b078..b200cb6 100644 --- a/src/micro_blog/api.clj +++ b/src/micro_blog/api.clj @@ -17,7 +17,7 @@ (defn blue-sky-proc-handler [_request] (let [msg "Procing BlueSky Scrape"] (tel/log! :info msg) - (micro-blog.blue-sky/run) + (micro-blog.mastodon/run) {:status 200 :body msg})) diff --git a/src/micro_blog/blue_sky.clj b/src/micro_blog/blue_sky.clj index 08ff276..d491dac 100644 --- a/src/micro_blog/blue_sky.clj +++ b/src/micro_blog/blue_sky.clj @@ -47,7 +47,7 @@ (defn get-posts-until-id ([session id] (get-posts-until-id session id nil [])) ([session id cursor prev-posts] - (tel/log! {:level :info :data {:id id, :cursor cursor, :prev-posts (count prev-posts)}} "Getting posts until id") + (tel/log! {:level :info :data {:postId (:remoteId id)}} "Getting posts until id") (let [limit 5 body (-> (http-client/get (str (@config :blue-sky-host) "/app.bsky.feed.getAuthorFeed") diff --git a/src/micro_blog/pocket_base.clj b/src/micro_blog/pocket_base.clj index ec4bd3b..515ac85 100644 --- a/src/micro_blog/pocket_base.clj +++ b/src/micro_blog/pocket_base.clj @@ -190,21 +190,17 @@ (if (post-with-remote-id-already-saved? (:remoteId post)) (tel/log! {:level :warn :data {:remoteId (:remoteId post)}} "Post already saved, skipping") (try - (let [tag-ids (doall (map get-tag-id (:tags post))) - image-ids (doall (map #(apply get-image-id %) (:images post)))] - (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") - {:headers {"Authorization" (get-login-token-with-cache)} - :form-params (assoc post - :source (name (:source post)) - :isTech (name (:isTech post)) - :tags tag-ids - :images image-ids) - :content-type :json - :as :json})) + (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") + {:headers {"Authorization" (get-login-token-with-cache)} + :form-params (assoc post + :tags (map get-tag-id (:tags post)) + :images (map #(apply get-image-id %) (:images post))) + :content-type :json + :as :json}) (catch Exception e (tel/log! {:level :error :id :pocket-base/save-post-error - :data {:post post + :data {:remoteId (:remoteId post) :error (.getMessage e) :exception e}} "Error saving post to pocketbase")