diff --git a/README.md b/README.md index ee89b7d..d523f33 100644 --- a/README.md +++ b/README.md @@ -12,21 +12,8 @@ instance. - [x] BlueSky - [ ] Pixelfed - [x] Mastodon -- [x] Nostr +- [ ] Nostr -## Dev - -Run the app locally: - -```shell -$ clj -M -m micro-blog.main -``` - -or just run parts via repl: - -```shell -$ clj -``` ## Deployment diff --git a/build.sh b/build.sh deleted file mode 100755 index 6330d68..0000000 --- a/build.sh +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -set -e - -export AWS_PROFILE=personal -export AWS_REGION=eu-central-1 - -REPO_NAME="micro-blog-fetchers-homelabstack" - -if ! aws ecr describe-repositories --repository-names "$REPO_NAME" >/dev/null 2>&1; then - aws ecr create-repository --repository-name "$REPO_NAME" -fi - -docker buildx build --platform linux/amd64,linux/arm64 -t "853019563312.dkr.ecr.eu-central-1.amazonaws.com/$REPO_NAME:latest" --push . - -echo "Docker image built and pushed to AWS ECR" diff --git a/deps.edn b/deps.edn index 47e2f36..0a35e3d 100644 --- a/deps.edn +++ b/deps.edn @@ -15,9 +15,7 @@ ;; json cheshire/cheshire {:mvn/version "6.0.0"} ;; metosin/muuntaja {:mvn/version "0.6.11"} - org.clojure/clojure {:mvn/version "1.12.1"} - ;; websockets - hato/hato {:mvn/version "1.0.0"}} + org.clojure/clojure {:mvn/version "1.12.1"}} :aliases {;; Run with clj -T:build function-in-build diff --git a/old_node_src/bluesky.ts b/old_node_src/bluesky.ts new file mode 100644 index 0000000..98e62a3 --- /dev/null +++ b/old_node_src/bluesky.ts @@ -0,0 +1,155 @@ +import pino from "pino"; +import { MicroBlogBackend } from "./pocketbase"; + +// logger +const logger = pino(); + +// pocketbase +const pb = new MicroBlogBackend(logger); + +type Session = { + did: string; + accessJwt: string; + refreshJwt: string; +}; + +type BlueSkyPost = { + cid: string; + embed?: { + images: { + fullsize: string; + alt: string; + }[]; + }; + record: { + createdAt: string; // '2024-06-25T05:32:06.269Z', + // embed: { '$type': 'app.bsky.embed.images', images: [Array] }, + // facets: [ [Object] ], + text: string; + facets: { + features: { + $type: string; //"app.bsky.richtext.facet#tag",\n' + + tag?: string; // "cooking"\n' + + }[]; + }[]; + }; +}; + +const createSession = async (): Promise => { + const identifier = process.env.BLUE_SKY_USERNAME; + const apiKey = process.env.BLUE_SKY_API_KEY; + const body = JSON.stringify({ identifier, password: apiKey }); + const url = "https://bsky.social/xrpc/com.atproto.server.createSession"; + const res = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: body, + }); + const data = (await res.json()) as Session; + return data; +}; + +const limit = 10; +const getPostsUntilID = async ( + session: Session, + id: string, + cursor: string | null = null, + oldFeed: BlueSkyPost[] = [], +): Promise => { + const params = new URLSearchParams(); + params.append("actor", session.did); + params.append("limit", limit.toString()); + if (cursor) { + params.append("cursor", cursor); + } + + const urlWithParams = new URL( + "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed", + ); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams, { + headers: { + Accept: "application/json", + Authorization: `Bearer ${session.accessJwt}`, + }, + }); + const rawData = (await res.json()) as { + feed: { post: BlueSkyPost }[]; + cursor: string | null; + }; + const rawFeed = rawData.feed; + const feed = rawFeed.map((item) => item.post); + cursor = rawData?.cursor; + const filteredFeed = []; + for (const post of feed) { + if (post.cid === id) { + break; + } + filteredFeed.push(post); + } + + // the post id we are searching until is in the res so return + if (filteredFeed.length !== feed.length) { + return filteredFeed.concat(oldFeed); + } + // there are more posts to add before the id + if (feed.length === limit) { + return getPostsUntilID(session, id, cursor, oldFeed.concat(feed)); + } + + // the id was not found in the feed, return everything + return oldFeed.concat(feed); +}; + +const savePost = async (post: BlueSkyPost) => { + const postData = { + remoteId: post.cid, + posted: post.record.createdAt, + source: "blue_sky" as const, + fullPost: post, + authorId: "travisshears.bsky.social", + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: BlueSkyPost, postId: string) => { + for (const facet of post.record.facets) { + for (const feature of facet.features) { + if (feature.$type === "app.bsky.richtext.facet#tag") { + const tag = feature.tag; + if (tag) { + await pb.setTag(tag, postId); + } + } + } + } +}; + +const saveImages = async (post: BlueSkyPost, postId: string) => { + const images = post.embed?.images ?? []; + for (const image of images) { + await pb.saveAndSetImage( + { remoteURL: image.fullsize, alt: image.alt }, + postId, + ); + } +}; + +(async () => { + const session = await createSession(); + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("blue_sky"); + const posts = await getPostsUntilID(session, lastSavedPostId ?? ""); + posts.reverse(); // save the oldest post first so if we fail posts are not lost on the next run + if (posts.length === 0) { + logger.info("No new posts to save"); + } + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +})(); diff --git a/old_node_src/mastodon.ts b/old_node_src/mastodon.ts new file mode 100644 index 0000000..ef14cc6 --- /dev/null +++ b/old_node_src/mastodon.ts @@ -0,0 +1,103 @@ +import pino from "pino"; +import { MicroBlogBackend } from "./pocketbase"; + +// custom destination formatter +const logger = pino(); + +const pb = new MicroBlogBackend(logger); + +const baseURL = process.env.MASTODON_BASE_URL; +const accountId = process.env.MASTODON_ACCOUNT_ID; + +type MastodonPost = { + media_attachments: { + type: string; //'image', + url: string; + description: string; // 'Blurry gate', + }[]; + id: string; + content: string; + account: { + id: string; + }; + created_at: string; + tags: { name: string }[]; +}; + +const getPostUntilId = async ({ + lastSavedId, + maxId, + carryPosts = [], +}: { + lastSavedId?: string; + maxId?: string; + carryPosts?: MastodonPost[]; +}): Promise => { + const params = new URLSearchParams(); + params.append("limit", "10"); + const urlWithParams = new URL( + `${baseURL}/api/v1/accounts/${accountId}/statuses`, + ); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams); + const posts = (await res.json()) as MastodonPost[]; + const containsId = posts.some((post) => post.id === lastSavedId); + + if (!containsId && posts.length >= 5) { + return getPostUntilId({ + lastSavedId, + carryPosts: carryPosts?.concat(posts), + maxId: posts[posts.length - 1]?.id, + }); + } + + const allPosts = carryPosts?.concat(posts).reverse(); + if (lastSavedId) { + const index = allPosts.findIndex((post) => post.id === lastSavedId); + return allPosts.slice(index + 1); + } + return allPosts; +}; + +const savePost = async (post: MastodonPost) => { + const postData = { + remoteId: post.id, + authorId: post.account.id, + posted: post.created_at, + source: "mastodon" as const, + fullPost: post, + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: MastodonPost, postId: string) => { + logger.info({ tags: post.tags }, "saving tags"); + for (const tag of post.tags) { + await pb.setTag(tag.name, postId); + } +}; + +const saveImages = async (post: MastodonPost, postId: string) => { + logger.info({ images: post.media_attachments }, "saving images"); + for (const image of post.media_attachments) { + await pb.saveAndSetImage( + { remoteURL: image.url, alt: image.description }, + postId, + ); + } +}; + +(async () => { + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("mastodon"); + console.log({ lastSavedPostId }); + const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); + console.log({ posts }); + + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +})(); diff --git a/old_node_src/nostr.ts b/old_node_src/nostr.ts new file mode 100644 index 0000000..fea4ec3 --- /dev/null +++ b/old_node_src/nostr.ts @@ -0,0 +1,106 @@ +import pino from "pino"; +import WebSocket from "ws"; +import { MicroBlogBackend } from "./pocketbase"; + +const logger = pino(); +const pb = new MicroBlogBackend(logger); + +const fetcherNpub = process.env.NOSTR_FETCHER_NPUB; +const myNpub = process.env.NOSTR_ID; + +if (!fetcherNpub) { + throw new Error("NOSTR_FETCHER_NPUB is not set"); +} + +if (!myNpub) { + throw new Error("NOSTR_ID is not set"); +} + +type NostrTag = ["t" | "r" | "imeta", string]; +type NostrEvent = [ + "EVENT" | "EOSE", + string, + { + id: string; + pubkey: string; + created_at: number; + kind: number; // 1 + tags: NostrTag[]; + content: string; + sig: string; + }, +]; + +(async () => { + logger.info("Starting Nostr Fetcher"); + + // figure out when the last post of wasved + const lastSavedPost = await pb.getLatestPostBySource("nostr"); + if (!lastSavedPost) { + throw new Error("No last saved nostr post found"); + } + let since: number | undefined; + since = new Date(lastSavedPost.posted).getTime() / 1000; + logger.info( + { lastSavedPostId: lastSavedPost.id, since }, + "lastSavedPost nostr post", + ); + // listen for new events for 30 seconds + logger.info("trying to connecting to nostr relay"); + const relay = process.env.NOSTR_RELAY; + if (!relay) { + throw new Error("No NOSTR_RELAY environment variable found"); + } + + const events: NostrEvent[] = []; + const ws = new WebSocket(relay); + ws.on("error", logger.error); + ws.on("message", function message(data: Buffer) { + const decodedData = JSON.parse( + Buffer.from(data).toString("utf8"), + ) as NostrEvent; + logger.info({ decodedData }, "recived a message from nostr relay"); + if (decodedData[0] === "EVENT") { + events.push(decodedData); + } + }); + ws.on("open", function open() { + logger.info("connection established"); + ws.send( + JSON.stringify([ + "REQ", + fetcherNpub, + { kinds: [1], authors: [myNpub], ...(since ? { since } : {}) }, + ]), + ); + }); + await new Promise((resolve) => setTimeout(resolve, 30000)); + logger.info("closing connection to nostr relay"); + ws.close(); + logger.info({ count: events.length }, "saving nostr posts"); + for (const event of events) { + const post = await pb.savePost({ + remoteId: event[2].id, + fullPost: event[2], + posted: new Date(event[2].created_at * 1000).toISOString(), + source: "nostr", + authorId: event[1], + }); + + for (const tag of event[2].tags) { + if (tag[0] === "t") { + await pb.setTag(tag[1], post.id); + } else if (tag[0] === "imeta") { + const value = tag[1]; + // remove "url " from the start of the string + const url = value.slice(4); + await pb.saveAndSetImage( + { + remoteURL: url, + }, + post.id, + ); + } + } + } +})(); diff --git a/old_node_src/pixelfed.ts b/old_node_src/pixelfed.ts new file mode 100644 index 0000000..a2ce146 --- /dev/null +++ b/old_node_src/pixelfed.ts @@ -0,0 +1,107 @@ +import pino from "pino"; +import { lambdaRequestTracker, pinoLambdaDestination } from "pino-lambda"; +import { MicroBlogBackend } from "./pocketbase"; + +// custom destination formatter +const destination = pinoLambdaDestination(); +const logger = pino({}, destination); +const withRequest = lambdaRequestTracker(); + +const baseURL = process.env.PIXELFED_BASE_URL; +const accountID = process.env.PIXELFED_ACCOUNT_ID; + +const pb = new MicroBlogBackend(logger); + +type PixelFedPost = { + media_attachments: { + type: string; //'image', + url: string; // 'https://gram.social/storage/m/_v2/703621281309160235/530d83cd3-f15549/FR41GdSiUQY0/bNHMrzuQkuhXKKfR1zG4HHcjFTe6G2YF02SOr2zi.jpg', + description: string; // 'Blurry gate', + }[]; + id: string; + content: string; + account: { + id: string; + }; + created_at: string; + tags: { name: string }[]; +}; + +const getPostUntilId = async ({ + lastSavedId, + maxId, + carryPosts = [], +}: { + lastSavedId?: string; + maxId?: string; + carryPosts?: PixelFedPost[]; +}): Promise => { + const params = new URLSearchParams(); + params.append("limit", "5"); + params.append("only_media", "true"); + if (maxId) { + params.append("max_id", maxId); + } + + const urlWithParams = new URL(`${baseURL}${accountID}/statuses`); + urlWithParams.search = params.toString(); + + const res = await fetch(urlWithParams.toString()); + const posts = (await res.json()) as PixelFedPost[]; + const containsId = posts.some((post) => post.id === lastSavedId); + + if (!containsId && posts.length >= 5) { + return getPostUntilId({ + lastSavedId, + carryPosts: carryPosts?.concat(posts), + maxId: posts[posts.length - 1]?.id, + }); + } + + const allPosts = carryPosts?.concat(posts).reverse(); + if (lastSavedId) { + const index = allPosts.findIndex((post) => post.id === lastSavedId); + return allPosts.slice(index + 1); + } + return allPosts; +}; + +const savePost = async (post: PixelFedPost) => { + const postData = { + remoteId: post.id, + authorId: post.account.id, + posted: post.created_at, + source: "pixelfed" as const, + fullPost: post, + }; + return await pb.savePost(postData); +}; + +const saveTags = async (post: PixelFedPost, postId: string) => { + logger.info({ tags: post.tags }, "saving tags"); + for (const tag of post.tags) { + await pb.setTag(tag.name, postId); + } +}; + +const saveImages = async (post: PixelFedPost, postId: string) => { + logger.info({ images: post.media_attachments }, "saving images"); + for (const image of post.media_attachments) { + await pb.saveAndSetImage( + { remoteURL: image.url, alt: image.description }, + postId + ); + } +}; + +exports.run = async (event: any, context: any) => { + withRequest(event, context); + const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("pixelfed"); + const posts = await getPostUntilId({ lastSavedId: lastSavedPostId }); + for (const post of posts) { + logger.info({ post }, "saving post"); + const savedNewPost = await savePost(post); + await saveTags(post, savedNewPost.id); + await saveImages(post, savedNewPost.id); + } +}; diff --git a/old_node_src/pocketbase.ts b/old_node_src/pocketbase.ts new file mode 100644 index 0000000..fed568a --- /dev/null +++ b/old_node_src/pocketbase.ts @@ -0,0 +1,223 @@ +import type { Logger } from "pino"; +import PocketBase from "pocketbase"; + +export type MicroBlogPostImage = { + id: string; + collectionId: string; + image: string; + alt?: string; + remoteURL: string; +}; + +export type MicroBlogPostTag = { + id: string; + tag: string; +}; + +export type MicroBlogPostSource = + | "blue_sky" + | "mastodon" + | "pleroma" + | "pixelfed" + | "nostr"; + +export type MicroBlogPost = { + source: MicroBlogPostSource; + fullPost: any; + remoteId: string; + authorId: string; + id: string; + posted: string; + expand: { + images?: MicroBlogPostImage[]; + tags?: { + id: string; + }[]; + }; +}; + +export class MicroBlogBackend { + private pb: PocketBase; + private clientSetTime?: Date; + constructor(private logger: Logger) { + this.pb = new PocketBase(process.env.POCKET_BASE_HOST); + } + + private async login() { + const pw = process.env.POCKET_BASE_PW; + const userName = process.env.POCKET_BASE_USER; + if (!pw) { + this.logger.error("POCKET_BASE_PW env var not set"); + throw new Error("POCKET_BASE_PW env var not set"); + } + if (!userName) { + this.logger.error("POCKET_BASE_USER env var not set"); + throw new Error("POCKET_BASE_USER env var not set"); + } + this.logger.info({ userName }, "Logging in to pocketbase"); + await this.pb.collection("users").authWithPassword(userName, pw); + this.clientSetTime = new Date(); + } + + private async checkLogin() { + if (!this.clientSetTime) { + await this.login(); + return; + } + const now = new Date(); + const diff = now.getTime() - this.clientSetTime.getTime(); + const day = 86_400_000; + + if (diff > day) { + await this.login(); + return; + } + } + + public async getLatestPostBySource( + postSource: MicroBlogPostSource, + ): Promise { + await this.checkLogin(); + try { + const post = await this.pb + .collection("micro_blog_posts") + .getFirstListItem(`source = '${postSource}'`, { sort: "-posted" }); + return post; + } catch (error: any) { + if (error.status === 404) { + return undefined; + } + throw error; + } + } + + public async getLatestPostRemoteIDBySource(postSource: MicroBlogPostSource) { + await this.checkLogin(); + const post = await this.getLatestPostBySource(postSource); + return post?.remoteId; + } + + async getTag(tag: string): Promise { + await this.checkLogin(); + try { + const remoteTag = await this.pb + .collection("micro_blog_tags") + .getFirstListItem(`tag = '${tag}'`); + return remoteTag; + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + async getImageByRemoteURL( + remoteURL: string, + ): Promise { + await this.checkLogin(); + try { + const remoteImage = await this.pb + .collection("micro_blog_images") + .getFirstListItem(`remoteURL = '${remoteURL}'`); + return remoteImage; + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + private async checkForPost( + remoteId: string, + ): Promise { + await this.checkLogin(); + try { + return await this.pb + .collection("micro_blog_posts") + .getFirstListItem(`remoteId = '${remoteId}'`); + } catch (e: any) { + if (e.status === 404) { + return undefined; + } + throw e; + } + } + + public async savePost( + post: Omit, + ): Promise { + await this.checkLogin(); + const existingPost = await this.checkForPost(post.remoteId); + if (!existingPost) { + return await this.pb + .collection("micro_blog_posts") + .create(post); + } + this.logger.info({ existingPost }, "Found existing post"); + return existingPost; + } + + public async setTag(rawTag: string, postId: string) { + await this.checkLogin(); + let tag = await this.getTag(rawTag); + if (!tag) { + tag = await this.pb + .collection("micro_blog_tags") + .create({ tag: rawTag }); + } + if (!tag) { + throw new Error("Failed to create tag"); + } + await this.pb.collection("micro_blog_posts").update(postId, { + "tags+": tag.id, + }); + } + + public async saveAndSetImage( + imageToSave: Omit, + postId: string, + ) { + await this.checkLogin(); + let image = await this.getImageByRemoteURL(imageToSave.remoteURL); + if (!image) { + const imageResponse = await fetch(imageToSave.remoteURL); + if (!imageResponse.ok) { + throw new Error("Failed to download image"); + } + const imageBlob = await imageResponse.blob(); + const imageFile = new File([imageBlob], "image.jpg", { + type: imageBlob.type, + }); + const data = { + ...imageToSave, + image: imageFile, + }; + image = await this.pb + .collection("micro_blog_images") + .create(data); + this.logger.info({ image }, "Created image"); + } + if (!image) { + throw new Error("Failed to create image"); + } + const res = await this.pb.collection("micro_blog_posts").update(postId, { + "images+": image.id, + }); + this.logger.info({ res }, "Updated post with image"); + } + + public async getPosts(page: number, limit = 20) { + await this.checkLogin(); + const resultList = await this.pb + .collection("micro_blog_posts") + .getList(page, limit, { + sort: "-posted", + expand: "images,tags", + filter: `(source = "blue_sky" || source = "pleroma")`, + // filter: 'source = "pleroma"', + }); + return resultList; + } +} diff --git a/src/micro_blog/api.clj b/src/micro_blog/api.clj index f28b078..38262e1 100644 --- a/src/micro_blog/api.clj +++ b/src/micro_blog/api.clj @@ -4,11 +4,10 @@ [micro-blog.config :refer [config]] micro-blog.mastodon micro-blog.blue-sky - micro-blog.nostr [taoensso.telemere :as tel])) (defn mastodon-proc-handler [_request] - (let [msg "Procding Mastodon Scrape"] + (let [msg "Proceding Mastodon Scrape"] (tel/log! :info msg) (micro-blog.mastodon/run) {:status 200 @@ -16,23 +15,14 @@ (defn blue-sky-proc-handler [_request] (let [msg "Procing BlueSky Scrape"] - (tel/log! :info msg) - (micro-blog.blue-sky/run) - {:status 200 - :body msg})) - -(defn nostr-proc-handler [_request] - (let [msg "Restarting Nostr scraper"] - (tel/log! :info msg) - (micro-blog.nostr/close) - (micro-blog.nostr/start) - {:status 200 - :body msg})) + (tel/log! :info msg) + (micro-blog.mastodon/run) + {:status 200 + :body msg})) (def routes #{["/bluesky" :get blue-sky-proc-handler :route-name :blue-sky] - ["/mastodon" :get mastodon-proc-handler :route-name :mastodon] - ["/nostr" :get nostr-proc-handler :route-name :nostr]}) + ["/mastodon" :get mastodon-proc-handler :route-name :mastodon]}) (defn create-connector [] (-> (conn/default-connector-map (:api-host @config) (Integer/parseInt (str (:api-port @config)))) diff --git a/src/micro_blog/blue_sky.clj b/src/micro_blog/blue_sky.clj index 08ff276..1c9579c 100644 --- a/src/micro_blog/blue_sky.clj +++ b/src/micro_blog/blue_sky.clj @@ -3,7 +3,6 @@ [clj-http.client :as http-client] [micro-blog.pocket-base :as pb] [micro-blog.utils :as utils] - [cheshire.core :as json] [micro-blog.is-tech] [taoensso.telemere :as tel] [micro-blog.config :refer [config]])) @@ -47,7 +46,7 @@ (defn get-posts-until-id ([session id] (get-posts-until-id session id nil [])) ([session id cursor prev-posts] - (tel/log! {:level :info :data {:id id, :cursor cursor, :prev-posts (count prev-posts)}} "Getting posts until id") + (tel/log! {:level :info :data {:postId (:remoteId id)}} "Getting posts until id") (let [limit 5 body (-> (http-client/get (str (@config :blue-sky-host) "/app.bsky.feed.getAuthorFeed") @@ -83,11 +82,10 @@ (map #(vector (:fullsize %) (:alt %)) images))) (defn transform-post [post] - (tel/log! {:level :info :data {:post post}} "Transforming post") (hash-map :source :blue_sky :fullPost post :remoteId (:cid post) - :isTech (micro-blog.is-tech/is-tech? (json/generate-string (:record post))) + :isTech (micro-blog.is-tech/is-tech? (:record post)) :authorId (get-in post [:author :handle]) :tags (extract-tags post) :images (extract-images post) diff --git a/src/micro_blog/config.clj b/src/micro_blog/config.clj index ad9372b..c12eaef 100644 --- a/src/micro_blog/config.clj +++ b/src/micro_blog/config.clj @@ -8,11 +8,7 @@ :mastodon-host "MASTODON_BASE_URL" :mastodon-account-id "MASTODON_ACCOUNT_ID" :api-host "API_HOST" - :api-port "API_PORT" - ;; :nostr-fetcher-npub "NOSTR_FETCHER_NPUB" - :nostr-fetcher-id "NOSTR_FETCHER_PUB_HEX" - :nostr-id "NOSTR_ID" - :nostr-relay "NOSTR_RELAY"}) + :api-port "API_PORT"}) (defn- load-config [] (merge (read-string (slurp "config.edn")) diff --git a/src/micro_blog/is_tech.clj b/src/micro_blog/is_tech.clj index 49eb22d..03cd950 100644 --- a/src/micro_blog/is_tech.clj +++ b/src/micro_blog/is_tech.clj @@ -4,8 +4,7 @@ [taoensso.telemere :as tel] [clj-http.client :as client])) -(defn is-tech? [post-text] - (when (not (string? post-text)) (throw (ex-info "Post text must be a string" {:post-text post-text}))) +(defn call-mistral-api [post-text] (let [url (str (:mistral-host @config) "/v1/conversations") headers {"Content-Type" "application/json" "Accept" "application/json" @@ -13,7 +12,7 @@ body {:inputs post-text :stream false :agent_id (@config :mistral-agent-id)}] - (tel/log! {:level :info :data {:url url :agent_id (:agent_id body) :post-text post-text}} "making request to mistral agent") + (tel/log! {:level :info :data {:url url :agent_id (:agent_id body)}} "making request to mistral agent") (-> (client/post url {:headers headers :form-params body @@ -23,4 +22,7 @@ :outputs first :content - (#(if (= "1" %) :yes_ai :no))))) + (#(if (= "1" %) true false))))) + +(defn is-tech? [post-text] + (call-mistral-api post-text)) diff --git a/src/micro_blog/main.clj b/src/micro_blog/main.clj index c4ec6b9..cb84b05 100644 --- a/src/micro_blog/main.clj +++ b/src/micro_blog/main.clj @@ -5,7 +5,6 @@ [taoensso.telemere :as tel] [micro-blog.logging.main :as logging] micro-blog.api - [micro-blog.nostr :as nostr] [micro-blog.blue-sky :as blue-sky] [micro-blog.mastodon :as masto])) @@ -16,8 +15,6 @@ (logging/setup-logging) (tel/log! :info "Setting up API") (micro-blog.api/start) - (tel/log! :info "Setting up nostr scraper") - (nostr/start) (tel/log! :info "Setting up crons") (doseq [[i cron] (map-indexed vector crons)] (let [start (.plus (Instant/now) (Duration/ofMinutes (* i 5)))] diff --git a/src/micro_blog/mastodon.clj b/src/micro_blog/mastodon.clj index 24b7d0d..8b7f027 100644 --- a/src/micro_blog/mastodon.clj +++ b/src/micro_blog/mastodon.clj @@ -17,7 +17,7 @@ [:media_attachments [:vector [:map [:url :string] [:type [:= "image"]] - [:description [:maybe :string]]]]]]]) + [:description :string]]]]]]) (defn get-posts-until-id [id] (let [limit 10 @@ -50,7 +50,7 @@ :remoteId (:id raw-post) :authorId (get-in raw-post [:account :id]) :tags (map :name (:tags raw-post)) - :images (map (fn [img] [(:url img) (or (:description img) "")]) (:media_attachments raw-post)) + :images (map (fn [img] [(:url img) (:description img)]) (:media_attachments raw-post)) :posted (:created_at raw-post))) (defn save-post [post] diff --git a/src/micro_blog/nostr.clj b/src/micro_blog/nostr.clj deleted file mode 100644 index 9a5a3de..0000000 --- a/src/micro_blog/nostr.clj +++ /dev/null @@ -1,82 +0,0 @@ -(ns micro-blog.nostr - (:require - [micro-blog.pocket-base :as pb] - [micro-blog.is-tech] - [taoensso.telemere :as tel] - [hato.websocket :as ws] - [cheshire.core :as json] - [clojure.string :as str] - [micro-blog.config :refer [config]]) - (:import - [java.time Instant OffsetDateTime ZoneOffset] - [java.time.format DateTimeFormatter])) - -(defn pb-date-to-unix-timestamp-seconds [date-str] - (-> date-str - (str/replace " " "T") - (Instant/parse) - (.getEpochSecond))) - -(defn nostr-date-to-pb [ts] - (let [instant (java.time.Instant/ofEpochSecond ts) - formatter (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss.SSSX") - zoned (.atZone instant java.time.ZoneOffset/UTC)] - (.format formatter zoned))) - -(defn last-post-timestamp [] - (pb-date-to-unix-timestamp-seconds (:posted (pb/get-latest-post-by-source :nostr)))) - -(defn transform-post [post] - (tel/log! {:level :info :data {:post post}} "Transforming nostr post") - (hash-map :source :nostr - :fullPost post - :remoteId (get post "id") - :isTech (micro-blog.is-tech/is-tech? (get post "content")) - :authorId (get post "pubkey") - :tags (reduce (fn [acc tag] - (let [tag-type (first tag) - tag-value (second tag)] - (if (= tag-type "t") - (conj acc tag-value) - acc))) [] (get post "tags")) - :images [] - :posted (nostr-date-to-pb (get post "created_at")))) - -(defn process-msg [raw-msg] - (let [msg (json/parse-string (.toString raw-msg))] - (tel/log! {:level :info :data {:msg msg}} "Processing nostr message") - (let [msg-type (first msg) ;; ex: EVENT - event (nth msg 2)] - (when (and (= (get event "kind") 1) (= msg-type "EVENT")) - (-> event - transform-post - pb/save-post))))) - -(def socket (atom nil)) -(declare start) -(defn connect [] - (tel/log! :info "Opening websocket connection to nostr relay") - (reset! socket @(ws/websocket (@config :nostr-relay) - {:headers {"User-Agent" "micro-blog-fetcher"} - :on-message (fn [_ws msg _last?] - (process-msg msg)) - :on-close (fn [_ws status reason] - (tel/log! {:level :warn :data {:status status :reason reason}} "WebSocket connection closed") - (future - (Thread/sleep (* 5 60 1000)) - (tel/log! :info "Reconnecting WebSocket") - (start)))}))) - -(defn subscribe-to-author [pubkey since] - (let [sub-id (@config :nostr-fetcher-id) - filter {:kinds [1] :authors [pubkey] :since since} - msg (json/generate-string ["REQ" sub-id filter])] - (.get (ws/send! @socket msg)))) - -(defn close [] - (tel/log! :info "Closing nostr socket") - (ws/close! @socket)) - -(defn start [] - (connect) - (subscribe-to-author (@config :nostr-id) (last-post-timestamp))) diff --git a/src/micro_blog/pocket_base.clj b/src/micro_blog/pocket_base.clj index ec4bd3b..05056a0 100644 --- a/src/micro_blog/pocket_base.clj +++ b/src/micro_blog/pocket_base.clj @@ -1,5 +1,6 @@ (ns micro-blog.pocket-base (:require + [clojure.pprint :refer [pprint]] [clojure.string :as str] [clj-http.client :as http-client] [malli.core :as m] @@ -39,14 +40,9 @@ new-token)))) (def source-enum [:enum :pleroma :blue_sky :mastodon :pixelfed :nostr]) -(def is-tech-enum [:enum :yes_ai :no :yes_human]) (defn valid-source? [source] (m/validate source-enum source)) -(def post-schema [:map - [:id :string] - [:remoteId :string]]) - (defn get-all-posts-by-source ([source] (get-all-posts-by-source source [] 1)) ([source carry page] @@ -71,11 +67,14 @@ (concat carry rows) (get-all-posts-by-source source (concat carry rows) (inc page)))))) -(defn get-latest-post-by-source [source] +(defn get-latest-post-remote-id-by-source [source] (let [res-schema [:map [:items - [:vector post-schema]]]] + [:vector + [:map + [:id string?] + [:remoteId string?]]]]]] (when (not (valid-source? source)) (throw (ex-info "Invalid source" {:source source}))) (as-> @@ -85,17 +84,17 @@ "perPage" 1 :sort "-posted" :filter (str "source = '" (name source) "'") - ;; :fields (str/join "," ["remoteId" "id"]) + :fields (str/join "," ["remoteId" "id"]) "skipTotal" true} :content-type :json :as :json}) x (:body x) - (utils/validate-with-throw x res-schema) - (-> x :items first)))) - -(defn get-latest-post-remote-id-by-source [source] - (tel/log! {:level :info :data {:source source}} "Fetching latest post remote ID for source") - (:remoteId (get-latest-post-by-source source))) + (if (m/validate res-schema x) + x + (do + (m/explain res-schema x) + (throw (ex-info "Res does not follow schema" {:res x})))) + (-> x :items first :remoteId)))) (defn post-with-remote-id-already-saved? [remote-id] (-> @@ -179,36 +178,24 @@ (let [save-post-schema [:map [:source source-enum] [:fullPost :any] - [:isTech is-tech-enum] + [:isTech :boolean] [:tags [:sequential :string]] [:images [:sequential [:tuple :string :string]]] [:remoteId :string] [:authorId :string] [:posted :string]]] + (utils/validate-with-throw post save-post-schema) (tel/log! {:level :info :data {:remoteId (:remoteId post)}} "Post passed save validation") (if (post-with-remote-id-already-saved? (:remoteId post)) - (tel/log! {:level :warn :data {:remoteId (:remoteId post)}} "Post already saved, skipping") - (try - (let [tag-ids (doall (map get-tag-id (:tags post))) - image-ids (doall (map #(apply get-image-id %) (:images post)))] - (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") - {:headers {"Authorization" (get-login-token-with-cache)} - :form-params (assoc post - :source (name (:source post)) - :isTech (name (:isTech post)) - :tags tag-ids - :images image-ids) - :content-type :json - :as :json})) - (catch Exception e - (tel/log! {:level :error - :id :pocket-base/save-post-error - :data {:post post - :error (.getMessage e) - :exception e}} - "Error saving post to pocketbase") - (throw e)))))) + (println "post already saved") + (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") + {:headers {"Authorization" (get-login-token-with-cache)} + :form-params (assoc post + :tags (map get-tag-id (:tags post)) + :images (map #(apply get-image-id %) (:images post))) + :content-type :json + :as :json})))) (defn set-is-tech [post-id is-tech] (tel/log! {:level :info :data {:post-id post-id}} "Setting post.is-tech to yes_ai") diff --git a/src/micro_blog/utils.clj b/src/micro_blog/utils.clj index 931028d..be84ba5 100644 --- a/src/micro_blog/utils.clj +++ b/src/micro_blog/utils.clj @@ -4,7 +4,6 @@ [malli.core :as m])) (defn validate-with-throw [value schema] - (tel/log! {:level :debug :data {:value value :schema schema}} "Validating value") (if (m/validate schema value) value (do