Compare commits
No commits in common. "main" and "clojure" have entirely different histories.
17 changed files with 736 additions and 186 deletions
15
README.md
15
README.md
|
|
@ -12,21 +12,8 @@ instance.
|
||||||
- [x] BlueSky
|
- [x] BlueSky
|
||||||
- [ ] Pixelfed
|
- [ ] Pixelfed
|
||||||
- [x] Mastodon
|
- [x] Mastodon
|
||||||
- [x] Nostr
|
- [ ] Nostr
|
||||||
|
|
||||||
## Dev
|
|
||||||
|
|
||||||
Run the app locally:
|
|
||||||
|
|
||||||
```shell
|
|
||||||
$ clj -M -m micro-blog.main
|
|
||||||
```
|
|
||||||
|
|
||||||
or just run parts via repl:
|
|
||||||
|
|
||||||
```shell
|
|
||||||
$ clj
|
|
||||||
```
|
|
||||||
|
|
||||||
## Deployment
|
## Deployment
|
||||||
|
|
||||||
|
|
|
||||||
16
build.sh
16
build.sh
|
|
@ -1,16 +0,0 @@
|
||||||
#!/bin/sh
|
|
||||||
|
|
||||||
set -e
|
|
||||||
|
|
||||||
export AWS_PROFILE=personal
|
|
||||||
export AWS_REGION=eu-central-1
|
|
||||||
|
|
||||||
REPO_NAME="micro-blog-fetchers-homelabstack"
|
|
||||||
|
|
||||||
if ! aws ecr describe-repositories --repository-names "$REPO_NAME" >/dev/null 2>&1; then
|
|
||||||
aws ecr create-repository --repository-name "$REPO_NAME"
|
|
||||||
fi
|
|
||||||
|
|
||||||
docker buildx build --platform linux/amd64,linux/arm64 -t "853019563312.dkr.ecr.eu-central-1.amazonaws.com/$REPO_NAME:latest" --push .
|
|
||||||
|
|
||||||
echo "Docker image built and pushed to AWS ECR"
|
|
||||||
4
deps.edn
4
deps.edn
|
|
@ -15,9 +15,7 @@
|
||||||
;; json
|
;; json
|
||||||
cheshire/cheshire {:mvn/version "6.0.0"}
|
cheshire/cheshire {:mvn/version "6.0.0"}
|
||||||
;; metosin/muuntaja {:mvn/version "0.6.11"}
|
;; metosin/muuntaja {:mvn/version "0.6.11"}
|
||||||
org.clojure/clojure {:mvn/version "1.12.1"}
|
org.clojure/clojure {:mvn/version "1.12.1"}}
|
||||||
;; websockets
|
|
||||||
hato/hato {:mvn/version "1.0.0"}}
|
|
||||||
|
|
||||||
:aliases
|
:aliases
|
||||||
{;; Run with clj -T:build function-in-build
|
{;; Run with clj -T:build function-in-build
|
||||||
|
|
|
||||||
155
old_node_src/bluesky.ts
Normal file
155
old_node_src/bluesky.ts
Normal file
|
|
@ -0,0 +1,155 @@
|
||||||
|
import pino from "pino";
|
||||||
|
import { MicroBlogBackend } from "./pocketbase";
|
||||||
|
|
||||||
|
// logger
|
||||||
|
const logger = pino();
|
||||||
|
|
||||||
|
// pocketbase
|
||||||
|
const pb = new MicroBlogBackend(logger);
|
||||||
|
|
||||||
|
type Session = {
|
||||||
|
did: string;
|
||||||
|
accessJwt: string;
|
||||||
|
refreshJwt: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type BlueSkyPost = {
|
||||||
|
cid: string;
|
||||||
|
embed?: {
|
||||||
|
images: {
|
||||||
|
fullsize: string;
|
||||||
|
alt: string;
|
||||||
|
}[];
|
||||||
|
};
|
||||||
|
record: {
|
||||||
|
createdAt: string; // '2024-06-25T05:32:06.269Z',
|
||||||
|
// embed: { '$type': 'app.bsky.embed.images', images: [Array] },
|
||||||
|
// facets: [ [Object] ],
|
||||||
|
text: string;
|
||||||
|
facets: {
|
||||||
|
features: {
|
||||||
|
$type: string; //"app.bsky.richtext.facet#tag",\n' +
|
||||||
|
tag?: string; // "cooking"\n' +
|
||||||
|
}[];
|
||||||
|
}[];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
const createSession = async (): Promise<Session> => {
|
||||||
|
const identifier = process.env.BLUE_SKY_USERNAME;
|
||||||
|
const apiKey = process.env.BLUE_SKY_API_KEY;
|
||||||
|
const body = JSON.stringify({ identifier, password: apiKey });
|
||||||
|
const url = "https://bsky.social/xrpc/com.atproto.server.createSession";
|
||||||
|
const res = await fetch(url, {
|
||||||
|
method: "POST",
|
||||||
|
headers: {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
},
|
||||||
|
body: body,
|
||||||
|
});
|
||||||
|
const data = (await res.json()) as Session;
|
||||||
|
return data;
|
||||||
|
};
|
||||||
|
|
||||||
|
const limit = 10;
|
||||||
|
const getPostsUntilID = async (
|
||||||
|
session: Session,
|
||||||
|
id: string,
|
||||||
|
cursor: string | null = null,
|
||||||
|
oldFeed: BlueSkyPost[] = [],
|
||||||
|
): Promise<BlueSkyPost[]> => {
|
||||||
|
const params = new URLSearchParams();
|
||||||
|
params.append("actor", session.did);
|
||||||
|
params.append("limit", limit.toString());
|
||||||
|
if (cursor) {
|
||||||
|
params.append("cursor", cursor);
|
||||||
|
}
|
||||||
|
|
||||||
|
const urlWithParams = new URL(
|
||||||
|
"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed",
|
||||||
|
);
|
||||||
|
urlWithParams.search = params.toString();
|
||||||
|
|
||||||
|
const res = await fetch(urlWithParams, {
|
||||||
|
headers: {
|
||||||
|
Accept: "application/json",
|
||||||
|
Authorization: `Bearer ${session.accessJwt}`,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const rawData = (await res.json()) as {
|
||||||
|
feed: { post: BlueSkyPost }[];
|
||||||
|
cursor: string | null;
|
||||||
|
};
|
||||||
|
const rawFeed = rawData.feed;
|
||||||
|
const feed = rawFeed.map((item) => item.post);
|
||||||
|
cursor = rawData?.cursor;
|
||||||
|
const filteredFeed = [];
|
||||||
|
for (const post of feed) {
|
||||||
|
if (post.cid === id) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
filteredFeed.push(post);
|
||||||
|
}
|
||||||
|
|
||||||
|
// the post id we are searching until is in the res so return
|
||||||
|
if (filteredFeed.length !== feed.length) {
|
||||||
|
return filteredFeed.concat(oldFeed);
|
||||||
|
}
|
||||||
|
// there are more posts to add before the id
|
||||||
|
if (feed.length === limit) {
|
||||||
|
return getPostsUntilID(session, id, cursor, oldFeed.concat(feed));
|
||||||
|
}
|
||||||
|
|
||||||
|
// the id was not found in the feed, return everything
|
||||||
|
return oldFeed.concat(feed);
|
||||||
|
};
|
||||||
|
|
||||||
|
const savePost = async (post: BlueSkyPost) => {
|
||||||
|
const postData = {
|
||||||
|
remoteId: post.cid,
|
||||||
|
posted: post.record.createdAt,
|
||||||
|
source: "blue_sky" as const,
|
||||||
|
fullPost: post,
|
||||||
|
authorId: "travisshears.bsky.social",
|
||||||
|
};
|
||||||
|
return await pb.savePost(postData);
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveTags = async (post: BlueSkyPost, postId: string) => {
|
||||||
|
for (const facet of post.record.facets) {
|
||||||
|
for (const feature of facet.features) {
|
||||||
|
if (feature.$type === "app.bsky.richtext.facet#tag") {
|
||||||
|
const tag = feature.tag;
|
||||||
|
if (tag) {
|
||||||
|
await pb.setTag(tag, postId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveImages = async (post: BlueSkyPost, postId: string) => {
|
||||||
|
const images = post.embed?.images ?? [];
|
||||||
|
for (const image of images) {
|
||||||
|
await pb.saveAndSetImage(
|
||||||
|
{ remoteURL: image.fullsize, alt: image.alt },
|
||||||
|
postId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const session = await createSession();
|
||||||
|
const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("blue_sky");
|
||||||
|
const posts = await getPostsUntilID(session, lastSavedPostId ?? "");
|
||||||
|
posts.reverse(); // save the oldest post first so if we fail posts are not lost on the next run
|
||||||
|
if (posts.length === 0) {
|
||||||
|
logger.info("No new posts to save");
|
||||||
|
}
|
||||||
|
for (const post of posts) {
|
||||||
|
logger.info({ post }, "saving post");
|
||||||
|
const savedNewPost = await savePost(post);
|
||||||
|
await saveTags(post, savedNewPost.id);
|
||||||
|
await saveImages(post, savedNewPost.id);
|
||||||
|
}
|
||||||
|
})();
|
||||||
103
old_node_src/mastodon.ts
Normal file
103
old_node_src/mastodon.ts
Normal file
|
|
@ -0,0 +1,103 @@
|
||||||
|
import pino from "pino";
|
||||||
|
import { MicroBlogBackend } from "./pocketbase";
|
||||||
|
|
||||||
|
// custom destination formatter
|
||||||
|
const logger = pino();
|
||||||
|
|
||||||
|
const pb = new MicroBlogBackend(logger);
|
||||||
|
|
||||||
|
const baseURL = process.env.MASTODON_BASE_URL;
|
||||||
|
const accountId = process.env.MASTODON_ACCOUNT_ID;
|
||||||
|
|
||||||
|
type MastodonPost = {
|
||||||
|
media_attachments: {
|
||||||
|
type: string; //'image',
|
||||||
|
url: string;
|
||||||
|
description: string; // 'Blurry gate',
|
||||||
|
}[];
|
||||||
|
id: string;
|
||||||
|
content: string;
|
||||||
|
account: {
|
||||||
|
id: string;
|
||||||
|
};
|
||||||
|
created_at: string;
|
||||||
|
tags: { name: string }[];
|
||||||
|
};
|
||||||
|
|
||||||
|
const getPostUntilId = async ({
|
||||||
|
lastSavedId,
|
||||||
|
maxId,
|
||||||
|
carryPosts = [],
|
||||||
|
}: {
|
||||||
|
lastSavedId?: string;
|
||||||
|
maxId?: string;
|
||||||
|
carryPosts?: MastodonPost[];
|
||||||
|
}): Promise<MastodonPost[]> => {
|
||||||
|
const params = new URLSearchParams();
|
||||||
|
params.append("limit", "10");
|
||||||
|
const urlWithParams = new URL(
|
||||||
|
`${baseURL}/api/v1/accounts/${accountId}/statuses`,
|
||||||
|
);
|
||||||
|
urlWithParams.search = params.toString();
|
||||||
|
|
||||||
|
const res = await fetch(urlWithParams);
|
||||||
|
const posts = (await res.json()) as MastodonPost[];
|
||||||
|
const containsId = posts.some((post) => post.id === lastSavedId);
|
||||||
|
|
||||||
|
if (!containsId && posts.length >= 5) {
|
||||||
|
return getPostUntilId({
|
||||||
|
lastSavedId,
|
||||||
|
carryPosts: carryPosts?.concat(posts),
|
||||||
|
maxId: posts[posts.length - 1]?.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const allPosts = carryPosts?.concat(posts).reverse();
|
||||||
|
if (lastSavedId) {
|
||||||
|
const index = allPosts.findIndex((post) => post.id === lastSavedId);
|
||||||
|
return allPosts.slice(index + 1);
|
||||||
|
}
|
||||||
|
return allPosts;
|
||||||
|
};
|
||||||
|
|
||||||
|
const savePost = async (post: MastodonPost) => {
|
||||||
|
const postData = {
|
||||||
|
remoteId: post.id,
|
||||||
|
authorId: post.account.id,
|
||||||
|
posted: post.created_at,
|
||||||
|
source: "mastodon" as const,
|
||||||
|
fullPost: post,
|
||||||
|
};
|
||||||
|
return await pb.savePost(postData);
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveTags = async (post: MastodonPost, postId: string) => {
|
||||||
|
logger.info({ tags: post.tags }, "saving tags");
|
||||||
|
for (const tag of post.tags) {
|
||||||
|
await pb.setTag(tag.name, postId);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveImages = async (post: MastodonPost, postId: string) => {
|
||||||
|
logger.info({ images: post.media_attachments }, "saving images");
|
||||||
|
for (const image of post.media_attachments) {
|
||||||
|
await pb.saveAndSetImage(
|
||||||
|
{ remoteURL: image.url, alt: image.description },
|
||||||
|
postId,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("mastodon");
|
||||||
|
console.log({ lastSavedPostId });
|
||||||
|
const posts = await getPostUntilId({ lastSavedId: lastSavedPostId });
|
||||||
|
console.log({ posts });
|
||||||
|
|
||||||
|
for (const post of posts) {
|
||||||
|
logger.info({ post }, "saving post");
|
||||||
|
const savedNewPost = await savePost(post);
|
||||||
|
await saveTags(post, savedNewPost.id);
|
||||||
|
await saveImages(post, savedNewPost.id);
|
||||||
|
}
|
||||||
|
})();
|
||||||
106
old_node_src/nostr.ts
Normal file
106
old_node_src/nostr.ts
Normal file
|
|
@ -0,0 +1,106 @@
|
||||||
|
import pino from "pino";
|
||||||
|
import WebSocket from "ws";
|
||||||
|
import { MicroBlogBackend } from "./pocketbase";
|
||||||
|
|
||||||
|
const logger = pino();
|
||||||
|
const pb = new MicroBlogBackend(logger);
|
||||||
|
|
||||||
|
const fetcherNpub = process.env.NOSTR_FETCHER_NPUB;
|
||||||
|
const myNpub = process.env.NOSTR_ID;
|
||||||
|
|
||||||
|
if (!fetcherNpub) {
|
||||||
|
throw new Error("NOSTR_FETCHER_NPUB is not set");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!myNpub) {
|
||||||
|
throw new Error("NOSTR_ID is not set");
|
||||||
|
}
|
||||||
|
|
||||||
|
type NostrTag = ["t" | "r" | "imeta", string];
|
||||||
|
type NostrEvent = [
|
||||||
|
"EVENT" | "EOSE",
|
||||||
|
string,
|
||||||
|
{
|
||||||
|
id: string;
|
||||||
|
pubkey: string;
|
||||||
|
created_at: number;
|
||||||
|
kind: number; // 1
|
||||||
|
tags: NostrTag[];
|
||||||
|
content: string;
|
||||||
|
sig: string;
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
logger.info("Starting Nostr Fetcher");
|
||||||
|
|
||||||
|
// figure out when the last post of wasved
|
||||||
|
const lastSavedPost = await pb.getLatestPostBySource("nostr");
|
||||||
|
if (!lastSavedPost) {
|
||||||
|
throw new Error("No last saved nostr post found");
|
||||||
|
}
|
||||||
|
let since: number | undefined;
|
||||||
|
since = new Date(lastSavedPost.posted).getTime() / 1000;
|
||||||
|
logger.info(
|
||||||
|
{ lastSavedPostId: lastSavedPost.id, since },
|
||||||
|
"lastSavedPost nostr post",
|
||||||
|
);
|
||||||
|
// listen for new events for 30 seconds
|
||||||
|
logger.info("trying to connecting to nostr relay");
|
||||||
|
const relay = process.env.NOSTR_RELAY;
|
||||||
|
if (!relay) {
|
||||||
|
throw new Error("No NOSTR_RELAY environment variable found");
|
||||||
|
}
|
||||||
|
|
||||||
|
const events: NostrEvent[] = [];
|
||||||
|
const ws = new WebSocket(relay);
|
||||||
|
ws.on("error", logger.error);
|
||||||
|
ws.on("message", function message(data: Buffer) {
|
||||||
|
const decodedData = JSON.parse(
|
||||||
|
Buffer.from(data).toString("utf8"),
|
||||||
|
) as NostrEvent;
|
||||||
|
logger.info({ decodedData }, "recived a message from nostr relay");
|
||||||
|
if (decodedData[0] === "EVENT") {
|
||||||
|
events.push(decodedData);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ws.on("open", function open() {
|
||||||
|
logger.info("connection established");
|
||||||
|
ws.send(
|
||||||
|
JSON.stringify([
|
||||||
|
"REQ",
|
||||||
|
fetcherNpub,
|
||||||
|
{ kinds: [1], authors: [myNpub], ...(since ? { since } : {}) },
|
||||||
|
]),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 30000));
|
||||||
|
logger.info("closing connection to nostr relay");
|
||||||
|
ws.close();
|
||||||
|
logger.info({ count: events.length }, "saving nostr posts");
|
||||||
|
for (const event of events) {
|
||||||
|
const post = await pb.savePost({
|
||||||
|
remoteId: event[2].id,
|
||||||
|
fullPost: event[2],
|
||||||
|
posted: new Date(event[2].created_at * 1000).toISOString(),
|
||||||
|
source: "nostr",
|
||||||
|
authorId: event[1],
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const tag of event[2].tags) {
|
||||||
|
if (tag[0] === "t") {
|
||||||
|
await pb.setTag(tag[1], post.id);
|
||||||
|
} else if (tag[0] === "imeta") {
|
||||||
|
const value = tag[1];
|
||||||
|
// remove "url " from the start of the string
|
||||||
|
const url = value.slice(4);
|
||||||
|
await pb.saveAndSetImage(
|
||||||
|
{
|
||||||
|
remoteURL: url,
|
||||||
|
},
|
||||||
|
post.id,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})();
|
||||||
107
old_node_src/pixelfed.ts
Normal file
107
old_node_src/pixelfed.ts
Normal file
|
|
@ -0,0 +1,107 @@
|
||||||
|
import pino from "pino";
|
||||||
|
import { lambdaRequestTracker, pinoLambdaDestination } from "pino-lambda";
|
||||||
|
import { MicroBlogBackend } from "./pocketbase";
|
||||||
|
|
||||||
|
// custom destination formatter
|
||||||
|
const destination = pinoLambdaDestination();
|
||||||
|
const logger = pino({}, destination);
|
||||||
|
const withRequest = lambdaRequestTracker();
|
||||||
|
|
||||||
|
const baseURL = process.env.PIXELFED_BASE_URL;
|
||||||
|
const accountID = process.env.PIXELFED_ACCOUNT_ID;
|
||||||
|
|
||||||
|
const pb = new MicroBlogBackend(logger);
|
||||||
|
|
||||||
|
type PixelFedPost = {
|
||||||
|
media_attachments: {
|
||||||
|
type: string; //'image',
|
||||||
|
url: string; // 'https://gram.social/storage/m/_v2/703621281309160235/530d83cd3-f15549/FR41GdSiUQY0/bNHMrzuQkuhXKKfR1zG4HHcjFTe6G2YF02SOr2zi.jpg',
|
||||||
|
description: string; // 'Blurry gate',
|
||||||
|
}[];
|
||||||
|
id: string;
|
||||||
|
content: string;
|
||||||
|
account: {
|
||||||
|
id: string;
|
||||||
|
};
|
||||||
|
created_at: string;
|
||||||
|
tags: { name: string }[];
|
||||||
|
};
|
||||||
|
|
||||||
|
const getPostUntilId = async ({
|
||||||
|
lastSavedId,
|
||||||
|
maxId,
|
||||||
|
carryPosts = [],
|
||||||
|
}: {
|
||||||
|
lastSavedId?: string;
|
||||||
|
maxId?: string;
|
||||||
|
carryPosts?: PixelFedPost[];
|
||||||
|
}): Promise<PixelFedPost[]> => {
|
||||||
|
const params = new URLSearchParams();
|
||||||
|
params.append("limit", "5");
|
||||||
|
params.append("only_media", "true");
|
||||||
|
if (maxId) {
|
||||||
|
params.append("max_id", maxId);
|
||||||
|
}
|
||||||
|
|
||||||
|
const urlWithParams = new URL(`${baseURL}${accountID}/statuses`);
|
||||||
|
urlWithParams.search = params.toString();
|
||||||
|
|
||||||
|
const res = await fetch(urlWithParams.toString());
|
||||||
|
const posts = (await res.json()) as PixelFedPost[];
|
||||||
|
const containsId = posts.some((post) => post.id === lastSavedId);
|
||||||
|
|
||||||
|
if (!containsId && posts.length >= 5) {
|
||||||
|
return getPostUntilId({
|
||||||
|
lastSavedId,
|
||||||
|
carryPosts: carryPosts?.concat(posts),
|
||||||
|
maxId: posts[posts.length - 1]?.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const allPosts = carryPosts?.concat(posts).reverse();
|
||||||
|
if (lastSavedId) {
|
||||||
|
const index = allPosts.findIndex((post) => post.id === lastSavedId);
|
||||||
|
return allPosts.slice(index + 1);
|
||||||
|
}
|
||||||
|
return allPosts;
|
||||||
|
};
|
||||||
|
|
||||||
|
const savePost = async (post: PixelFedPost) => {
|
||||||
|
const postData = {
|
||||||
|
remoteId: post.id,
|
||||||
|
authorId: post.account.id,
|
||||||
|
posted: post.created_at,
|
||||||
|
source: "pixelfed" as const,
|
||||||
|
fullPost: post,
|
||||||
|
};
|
||||||
|
return await pb.savePost(postData);
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveTags = async (post: PixelFedPost, postId: string) => {
|
||||||
|
logger.info({ tags: post.tags }, "saving tags");
|
||||||
|
for (const tag of post.tags) {
|
||||||
|
await pb.setTag(tag.name, postId);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const saveImages = async (post: PixelFedPost, postId: string) => {
|
||||||
|
logger.info({ images: post.media_attachments }, "saving images");
|
||||||
|
for (const image of post.media_attachments) {
|
||||||
|
await pb.saveAndSetImage(
|
||||||
|
{ remoteURL: image.url, alt: image.description },
|
||||||
|
postId
|
||||||
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
exports.run = async (event: any, context: any) => {
|
||||||
|
withRequest(event, context);
|
||||||
|
const lastSavedPostId = await pb.getLatestPostRemoteIDBySource("pixelfed");
|
||||||
|
const posts = await getPostUntilId({ lastSavedId: lastSavedPostId });
|
||||||
|
for (const post of posts) {
|
||||||
|
logger.info({ post }, "saving post");
|
||||||
|
const savedNewPost = await savePost(post);
|
||||||
|
await saveTags(post, savedNewPost.id);
|
||||||
|
await saveImages(post, savedNewPost.id);
|
||||||
|
}
|
||||||
|
};
|
||||||
223
old_node_src/pocketbase.ts
Normal file
223
old_node_src/pocketbase.ts
Normal file
|
|
@ -0,0 +1,223 @@
|
||||||
|
import type { Logger } from "pino";
|
||||||
|
import PocketBase from "pocketbase";
|
||||||
|
|
||||||
|
export type MicroBlogPostImage = {
|
||||||
|
id: string;
|
||||||
|
collectionId: string;
|
||||||
|
image: string;
|
||||||
|
alt?: string;
|
||||||
|
remoteURL: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type MicroBlogPostTag = {
|
||||||
|
id: string;
|
||||||
|
tag: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
export type MicroBlogPostSource =
|
||||||
|
| "blue_sky"
|
||||||
|
| "mastodon"
|
||||||
|
| "pleroma"
|
||||||
|
| "pixelfed"
|
||||||
|
| "nostr";
|
||||||
|
|
||||||
|
export type MicroBlogPost = {
|
||||||
|
source: MicroBlogPostSource;
|
||||||
|
fullPost: any;
|
||||||
|
remoteId: string;
|
||||||
|
authorId: string;
|
||||||
|
id: string;
|
||||||
|
posted: string;
|
||||||
|
expand: {
|
||||||
|
images?: MicroBlogPostImage[];
|
||||||
|
tags?: {
|
||||||
|
id: string;
|
||||||
|
}[];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
export class MicroBlogBackend {
|
||||||
|
private pb: PocketBase;
|
||||||
|
private clientSetTime?: Date;
|
||||||
|
constructor(private logger: Logger) {
|
||||||
|
this.pb = new PocketBase(process.env.POCKET_BASE_HOST);
|
||||||
|
}
|
||||||
|
|
||||||
|
private async login() {
|
||||||
|
const pw = process.env.POCKET_BASE_PW;
|
||||||
|
const userName = process.env.POCKET_BASE_USER;
|
||||||
|
if (!pw) {
|
||||||
|
this.logger.error("POCKET_BASE_PW env var not set");
|
||||||
|
throw new Error("POCKET_BASE_PW env var not set");
|
||||||
|
}
|
||||||
|
if (!userName) {
|
||||||
|
this.logger.error("POCKET_BASE_USER env var not set");
|
||||||
|
throw new Error("POCKET_BASE_USER env var not set");
|
||||||
|
}
|
||||||
|
this.logger.info({ userName }, "Logging in to pocketbase");
|
||||||
|
await this.pb.collection("users").authWithPassword(userName, pw);
|
||||||
|
this.clientSetTime = new Date();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async checkLogin() {
|
||||||
|
if (!this.clientSetTime) {
|
||||||
|
await this.login();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const now = new Date();
|
||||||
|
const diff = now.getTime() - this.clientSetTime.getTime();
|
||||||
|
const day = 86_400_000;
|
||||||
|
|
||||||
|
if (diff > day) {
|
||||||
|
await this.login();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getLatestPostBySource(
|
||||||
|
postSource: MicroBlogPostSource,
|
||||||
|
): Promise<MicroBlogPost | undefined> {
|
||||||
|
await this.checkLogin();
|
||||||
|
try {
|
||||||
|
const post = await this.pb
|
||||||
|
.collection<MicroBlogPost>("micro_blog_posts")
|
||||||
|
.getFirstListItem(`source = '${postSource}'`, { sort: "-posted" });
|
||||||
|
return post;
|
||||||
|
} catch (error: any) {
|
||||||
|
if (error.status === 404) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getLatestPostRemoteIDBySource(postSource: MicroBlogPostSource) {
|
||||||
|
await this.checkLogin();
|
||||||
|
const post = await this.getLatestPostBySource(postSource);
|
||||||
|
return post?.remoteId;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getTag(tag: string): Promise<MicroBlogPostTag | undefined> {
|
||||||
|
await this.checkLogin();
|
||||||
|
try {
|
||||||
|
const remoteTag = await this.pb
|
||||||
|
.collection<MicroBlogPostTag>("micro_blog_tags")
|
||||||
|
.getFirstListItem(`tag = '${tag}'`);
|
||||||
|
return remoteTag;
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.status === 404) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getImageByRemoteURL(
|
||||||
|
remoteURL: string,
|
||||||
|
): Promise<MicroBlogPostImage | undefined> {
|
||||||
|
await this.checkLogin();
|
||||||
|
try {
|
||||||
|
const remoteImage = await this.pb
|
||||||
|
.collection<MicroBlogPostImage>("micro_blog_images")
|
||||||
|
.getFirstListItem(`remoteURL = '${remoteURL}'`);
|
||||||
|
return remoteImage;
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.status === 404) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async checkForPost(
|
||||||
|
remoteId: string,
|
||||||
|
): Promise<MicroBlogPost | undefined> {
|
||||||
|
await this.checkLogin();
|
||||||
|
try {
|
||||||
|
return await this.pb
|
||||||
|
.collection<MicroBlogPost>("micro_blog_posts")
|
||||||
|
.getFirstListItem(`remoteId = '${remoteId}'`);
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.status === 404) {
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async savePost(
|
||||||
|
post: Omit<MicroBlogPost, "id" | "expand">,
|
||||||
|
): Promise<MicroBlogPost> {
|
||||||
|
await this.checkLogin();
|
||||||
|
const existingPost = await this.checkForPost(post.remoteId);
|
||||||
|
if (!existingPost) {
|
||||||
|
return await this.pb
|
||||||
|
.collection<MicroBlogPost>("micro_blog_posts")
|
||||||
|
.create(post);
|
||||||
|
}
|
||||||
|
this.logger.info({ existingPost }, "Found existing post");
|
||||||
|
return existingPost;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async setTag(rawTag: string, postId: string) {
|
||||||
|
await this.checkLogin();
|
||||||
|
let tag = await this.getTag(rawTag);
|
||||||
|
if (!tag) {
|
||||||
|
tag = await this.pb
|
||||||
|
.collection<MicroBlogPostTag>("micro_blog_tags")
|
||||||
|
.create({ tag: rawTag });
|
||||||
|
}
|
||||||
|
if (!tag) {
|
||||||
|
throw new Error("Failed to create tag");
|
||||||
|
}
|
||||||
|
await this.pb.collection("micro_blog_posts").update(postId, {
|
||||||
|
"tags+": tag.id,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async saveAndSetImage(
|
||||||
|
imageToSave: Omit<MicroBlogPostImage, "id" | "image" | "collectionId">,
|
||||||
|
postId: string,
|
||||||
|
) {
|
||||||
|
await this.checkLogin();
|
||||||
|
let image = await this.getImageByRemoteURL(imageToSave.remoteURL);
|
||||||
|
if (!image) {
|
||||||
|
const imageResponse = await fetch(imageToSave.remoteURL);
|
||||||
|
if (!imageResponse.ok) {
|
||||||
|
throw new Error("Failed to download image");
|
||||||
|
}
|
||||||
|
const imageBlob = await imageResponse.blob();
|
||||||
|
const imageFile = new File([imageBlob], "image.jpg", {
|
||||||
|
type: imageBlob.type,
|
||||||
|
});
|
||||||
|
const data = {
|
||||||
|
...imageToSave,
|
||||||
|
image: imageFile,
|
||||||
|
};
|
||||||
|
image = await this.pb
|
||||||
|
.collection<MicroBlogPostTag>("micro_blog_images")
|
||||||
|
.create(data);
|
||||||
|
this.logger.info({ image }, "Created image");
|
||||||
|
}
|
||||||
|
if (!image) {
|
||||||
|
throw new Error("Failed to create image");
|
||||||
|
}
|
||||||
|
const res = await this.pb.collection("micro_blog_posts").update(postId, {
|
||||||
|
"images+": image.id,
|
||||||
|
});
|
||||||
|
this.logger.info({ res }, "Updated post with image");
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getPosts(page: number, limit = 20) {
|
||||||
|
await this.checkLogin();
|
||||||
|
const resultList = await this.pb
|
||||||
|
.collection<MicroBlogPost>("micro_blog_posts")
|
||||||
|
.getList(page, limit, {
|
||||||
|
sort: "-posted",
|
||||||
|
expand: "images,tags",
|
||||||
|
filter: `(source = "blue_sky" || source = "pleroma")`,
|
||||||
|
// filter: 'source = "pleroma"',
|
||||||
|
});
|
||||||
|
return resultList;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,11 +4,10 @@
|
||||||
[micro-blog.config :refer [config]]
|
[micro-blog.config :refer [config]]
|
||||||
micro-blog.mastodon
|
micro-blog.mastodon
|
||||||
micro-blog.blue-sky
|
micro-blog.blue-sky
|
||||||
micro-blog.nostr
|
|
||||||
[taoensso.telemere :as tel]))
|
[taoensso.telemere :as tel]))
|
||||||
|
|
||||||
(defn mastodon-proc-handler [_request]
|
(defn mastodon-proc-handler [_request]
|
||||||
(let [msg "Procding Mastodon Scrape"]
|
(let [msg "Proceding Mastodon Scrape"]
|
||||||
(tel/log! :info msg)
|
(tel/log! :info msg)
|
||||||
(micro-blog.mastodon/run)
|
(micro-blog.mastodon/run)
|
||||||
{:status 200
|
{:status 200
|
||||||
|
|
@ -16,23 +15,14 @@
|
||||||
|
|
||||||
(defn blue-sky-proc-handler [_request]
|
(defn blue-sky-proc-handler [_request]
|
||||||
(let [msg "Procing BlueSky Scrape"]
|
(let [msg "Procing BlueSky Scrape"]
|
||||||
(tel/log! :info msg)
|
(tel/log! :info msg)
|
||||||
(micro-blog.blue-sky/run)
|
(micro-blog.mastodon/run)
|
||||||
{:status 200
|
{:status 200
|
||||||
:body msg}))
|
:body msg}))
|
||||||
|
|
||||||
(defn nostr-proc-handler [_request]
|
|
||||||
(let [msg "Restarting Nostr scraper"]
|
|
||||||
(tel/log! :info msg)
|
|
||||||
(micro-blog.nostr/close)
|
|
||||||
(micro-blog.nostr/start)
|
|
||||||
{:status 200
|
|
||||||
:body msg}))
|
|
||||||
|
|
||||||
(def routes
|
(def routes
|
||||||
#{["/bluesky" :get blue-sky-proc-handler :route-name :blue-sky]
|
#{["/bluesky" :get blue-sky-proc-handler :route-name :blue-sky]
|
||||||
["/mastodon" :get mastodon-proc-handler :route-name :mastodon]
|
["/mastodon" :get mastodon-proc-handler :route-name :mastodon]})
|
||||||
["/nostr" :get nostr-proc-handler :route-name :nostr]})
|
|
||||||
|
|
||||||
(defn create-connector []
|
(defn create-connector []
|
||||||
(-> (conn/default-connector-map (:api-host @config) (Integer/parseInt (str (:api-port @config))))
|
(-> (conn/default-connector-map (:api-host @config) (Integer/parseInt (str (:api-port @config))))
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@
|
||||||
[clj-http.client :as http-client]
|
[clj-http.client :as http-client]
|
||||||
[micro-blog.pocket-base :as pb]
|
[micro-blog.pocket-base :as pb]
|
||||||
[micro-blog.utils :as utils]
|
[micro-blog.utils :as utils]
|
||||||
[cheshire.core :as json]
|
|
||||||
[micro-blog.is-tech]
|
[micro-blog.is-tech]
|
||||||
[taoensso.telemere :as tel]
|
[taoensso.telemere :as tel]
|
||||||
[micro-blog.config :refer [config]]))
|
[micro-blog.config :refer [config]]))
|
||||||
|
|
@ -47,7 +46,7 @@
|
||||||
(defn get-posts-until-id
|
(defn get-posts-until-id
|
||||||
([session id] (get-posts-until-id session id nil []))
|
([session id] (get-posts-until-id session id nil []))
|
||||||
([session id cursor prev-posts]
|
([session id cursor prev-posts]
|
||||||
(tel/log! {:level :info :data {:id id, :cursor cursor, :prev-posts (count prev-posts)}} "Getting posts until id")
|
(tel/log! {:level :info :data {:postId (:remoteId id)}} "Getting posts until id")
|
||||||
(let [limit 5
|
(let [limit 5
|
||||||
body
|
body
|
||||||
(-> (http-client/get (str (@config :blue-sky-host) "/app.bsky.feed.getAuthorFeed")
|
(-> (http-client/get (str (@config :blue-sky-host) "/app.bsky.feed.getAuthorFeed")
|
||||||
|
|
@ -83,11 +82,10 @@
|
||||||
(map #(vector (:fullsize %) (:alt %)) images)))
|
(map #(vector (:fullsize %) (:alt %)) images)))
|
||||||
|
|
||||||
(defn transform-post [post]
|
(defn transform-post [post]
|
||||||
(tel/log! {:level :info :data {:post post}} "Transforming post")
|
|
||||||
(hash-map :source :blue_sky
|
(hash-map :source :blue_sky
|
||||||
:fullPost post
|
:fullPost post
|
||||||
:remoteId (:cid post)
|
:remoteId (:cid post)
|
||||||
:isTech (micro-blog.is-tech/is-tech? (json/generate-string (:record post)))
|
:isTech (micro-blog.is-tech/is-tech? (:record post))
|
||||||
:authorId (get-in post [:author :handle])
|
:authorId (get-in post [:author :handle])
|
||||||
:tags (extract-tags post)
|
:tags (extract-tags post)
|
||||||
:images (extract-images post)
|
:images (extract-images post)
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,7 @@
|
||||||
:mastodon-host "MASTODON_BASE_URL"
|
:mastodon-host "MASTODON_BASE_URL"
|
||||||
:mastodon-account-id "MASTODON_ACCOUNT_ID"
|
:mastodon-account-id "MASTODON_ACCOUNT_ID"
|
||||||
:api-host "API_HOST"
|
:api-host "API_HOST"
|
||||||
:api-port "API_PORT"
|
:api-port "API_PORT"})
|
||||||
;; :nostr-fetcher-npub "NOSTR_FETCHER_NPUB"
|
|
||||||
:nostr-fetcher-id "NOSTR_FETCHER_PUB_HEX"
|
|
||||||
:nostr-id "NOSTR_ID"
|
|
||||||
:nostr-relay "NOSTR_RELAY"})
|
|
||||||
|
|
||||||
(defn- load-config []
|
(defn- load-config []
|
||||||
(merge (read-string (slurp "config.edn"))
|
(merge (read-string (slurp "config.edn"))
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,7 @@
|
||||||
[taoensso.telemere :as tel]
|
[taoensso.telemere :as tel]
|
||||||
[clj-http.client :as client]))
|
[clj-http.client :as client]))
|
||||||
|
|
||||||
(defn is-tech? [post-text]
|
(defn call-mistral-api [post-text]
|
||||||
(when (not (string? post-text)) (throw (ex-info "Post text must be a string" {:post-text post-text})))
|
|
||||||
(let [url (str (:mistral-host @config) "/v1/conversations")
|
(let [url (str (:mistral-host @config) "/v1/conversations")
|
||||||
headers {"Content-Type" "application/json"
|
headers {"Content-Type" "application/json"
|
||||||
"Accept" "application/json"
|
"Accept" "application/json"
|
||||||
|
|
@ -13,7 +12,7 @@
|
||||||
body {:inputs post-text
|
body {:inputs post-text
|
||||||
:stream false
|
:stream false
|
||||||
:agent_id (@config :mistral-agent-id)}]
|
:agent_id (@config :mistral-agent-id)}]
|
||||||
(tel/log! {:level :info :data {:url url :agent_id (:agent_id body) :post-text post-text}} "making request to mistral agent")
|
(tel/log! {:level :info :data {:url url :agent_id (:agent_id body)}} "making request to mistral agent")
|
||||||
(->
|
(->
|
||||||
(client/post url {:headers headers
|
(client/post url {:headers headers
|
||||||
:form-params body
|
:form-params body
|
||||||
|
|
@ -23,4 +22,7 @@
|
||||||
:outputs
|
:outputs
|
||||||
first
|
first
|
||||||
:content
|
:content
|
||||||
(#(if (= "1" %) :yes_ai :no)))))
|
(#(if (= "1" %) true false)))))
|
||||||
|
|
||||||
|
(defn is-tech? [post-text]
|
||||||
|
(call-mistral-api post-text))
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
[taoensso.telemere :as tel]
|
[taoensso.telemere :as tel]
|
||||||
[micro-blog.logging.main :as logging]
|
[micro-blog.logging.main :as logging]
|
||||||
micro-blog.api
|
micro-blog.api
|
||||||
[micro-blog.nostr :as nostr]
|
|
||||||
[micro-blog.blue-sky :as blue-sky]
|
[micro-blog.blue-sky :as blue-sky]
|
||||||
[micro-blog.mastodon :as masto]))
|
[micro-blog.mastodon :as masto]))
|
||||||
|
|
||||||
|
|
@ -16,8 +15,6 @@
|
||||||
(logging/setup-logging)
|
(logging/setup-logging)
|
||||||
(tel/log! :info "Setting up API")
|
(tel/log! :info "Setting up API")
|
||||||
(micro-blog.api/start)
|
(micro-blog.api/start)
|
||||||
(tel/log! :info "Setting up nostr scraper")
|
|
||||||
(nostr/start)
|
|
||||||
(tel/log! :info "Setting up crons")
|
(tel/log! :info "Setting up crons")
|
||||||
(doseq [[i cron] (map-indexed vector crons)]
|
(doseq [[i cron] (map-indexed vector crons)]
|
||||||
(let [start (.plus (Instant/now) (Duration/ofMinutes (* i 5)))]
|
(let [start (.plus (Instant/now) (Duration/ofMinutes (* i 5)))]
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@
|
||||||
[:media_attachments [:vector [:map
|
[:media_attachments [:vector [:map
|
||||||
[:url :string]
|
[:url :string]
|
||||||
[:type [:= "image"]]
|
[:type [:= "image"]]
|
||||||
[:description [:maybe :string]]]]]]])
|
[:description :string]]]]]])
|
||||||
|
|
||||||
(defn get-posts-until-id [id]
|
(defn get-posts-until-id [id]
|
||||||
(let [limit 10
|
(let [limit 10
|
||||||
|
|
@ -50,7 +50,7 @@
|
||||||
:remoteId (:id raw-post)
|
:remoteId (:id raw-post)
|
||||||
:authorId (get-in raw-post [:account :id])
|
:authorId (get-in raw-post [:account :id])
|
||||||
:tags (map :name (:tags raw-post))
|
:tags (map :name (:tags raw-post))
|
||||||
:images (map (fn [img] [(:url img) (or (:description img) "")]) (:media_attachments raw-post))
|
:images (map (fn [img] [(:url img) (:description img)]) (:media_attachments raw-post))
|
||||||
:posted (:created_at raw-post)))
|
:posted (:created_at raw-post)))
|
||||||
|
|
||||||
(defn save-post [post]
|
(defn save-post [post]
|
||||||
|
|
|
||||||
|
|
@ -1,82 +0,0 @@
|
||||||
(ns micro-blog.nostr
|
|
||||||
(:require
|
|
||||||
[micro-blog.pocket-base :as pb]
|
|
||||||
[micro-blog.is-tech]
|
|
||||||
[taoensso.telemere :as tel]
|
|
||||||
[hato.websocket :as ws]
|
|
||||||
[cheshire.core :as json]
|
|
||||||
[clojure.string :as str]
|
|
||||||
[micro-blog.config :refer [config]])
|
|
||||||
(:import
|
|
||||||
[java.time Instant OffsetDateTime ZoneOffset]
|
|
||||||
[java.time.format DateTimeFormatter]))
|
|
||||||
|
|
||||||
(defn pb-date-to-unix-timestamp-seconds [date-str]
|
|
||||||
(-> date-str
|
|
||||||
(str/replace " " "T")
|
|
||||||
(Instant/parse)
|
|
||||||
(.getEpochSecond)))
|
|
||||||
|
|
||||||
(defn nostr-date-to-pb [ts]
|
|
||||||
(let [instant (java.time.Instant/ofEpochSecond ts)
|
|
||||||
formatter (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss.SSSX")
|
|
||||||
zoned (.atZone instant java.time.ZoneOffset/UTC)]
|
|
||||||
(.format formatter zoned)))
|
|
||||||
|
|
||||||
(defn last-post-timestamp []
|
|
||||||
(pb-date-to-unix-timestamp-seconds (:posted (pb/get-latest-post-by-source :nostr))))
|
|
||||||
|
|
||||||
(defn transform-post [post]
|
|
||||||
(tel/log! {:level :info :data {:post post}} "Transforming nostr post")
|
|
||||||
(hash-map :source :nostr
|
|
||||||
:fullPost post
|
|
||||||
:remoteId (get post "id")
|
|
||||||
:isTech (micro-blog.is-tech/is-tech? (get post "content"))
|
|
||||||
:authorId (get post "pubkey")
|
|
||||||
:tags (reduce (fn [acc tag]
|
|
||||||
(let [tag-type (first tag)
|
|
||||||
tag-value (second tag)]
|
|
||||||
(if (= tag-type "t")
|
|
||||||
(conj acc tag-value)
|
|
||||||
acc))) [] (get post "tags"))
|
|
||||||
:images []
|
|
||||||
:posted (nostr-date-to-pb (get post "created_at"))))
|
|
||||||
|
|
||||||
(defn process-msg [raw-msg]
|
|
||||||
(let [msg (json/parse-string (.toString raw-msg))]
|
|
||||||
(tel/log! {:level :info :data {:msg msg}} "Processing nostr message")
|
|
||||||
(let [msg-type (first msg) ;; ex: EVENT
|
|
||||||
event (nth msg 2)]
|
|
||||||
(when (and (= (get event "kind") 1) (= msg-type "EVENT"))
|
|
||||||
(-> event
|
|
||||||
transform-post
|
|
||||||
pb/save-post)))))
|
|
||||||
|
|
||||||
(def socket (atom nil))
|
|
||||||
(declare start)
|
|
||||||
(defn connect []
|
|
||||||
(tel/log! :info "Opening websocket connection to nostr relay")
|
|
||||||
(reset! socket @(ws/websocket (@config :nostr-relay)
|
|
||||||
{:headers {"User-Agent" "micro-blog-fetcher"}
|
|
||||||
:on-message (fn [_ws msg _last?]
|
|
||||||
(process-msg msg))
|
|
||||||
:on-close (fn [_ws status reason]
|
|
||||||
(tel/log! {:level :warn :data {:status status :reason reason}} "WebSocket connection closed")
|
|
||||||
(future
|
|
||||||
(Thread/sleep (* 5 60 1000))
|
|
||||||
(tel/log! :info "Reconnecting WebSocket")
|
|
||||||
(start)))})))
|
|
||||||
|
|
||||||
(defn subscribe-to-author [pubkey since]
|
|
||||||
(let [sub-id (@config :nostr-fetcher-id)
|
|
||||||
filter {:kinds [1] :authors [pubkey] :since since}
|
|
||||||
msg (json/generate-string ["REQ" sub-id filter])]
|
|
||||||
(.get (ws/send! @socket msg))))
|
|
||||||
|
|
||||||
(defn close []
|
|
||||||
(tel/log! :info "Closing nostr socket")
|
|
||||||
(ws/close! @socket))
|
|
||||||
|
|
||||||
(defn start []
|
|
||||||
(connect)
|
|
||||||
(subscribe-to-author (@config :nostr-id) (last-post-timestamp)))
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
(ns micro-blog.pocket-base
|
(ns micro-blog.pocket-base
|
||||||
(:require
|
(:require
|
||||||
|
[clojure.pprint :refer [pprint]]
|
||||||
[clojure.string :as str]
|
[clojure.string :as str]
|
||||||
[clj-http.client :as http-client]
|
[clj-http.client :as http-client]
|
||||||
[malli.core :as m]
|
[malli.core :as m]
|
||||||
|
|
@ -39,14 +40,9 @@
|
||||||
new-token))))
|
new-token))))
|
||||||
|
|
||||||
(def source-enum [:enum :pleroma :blue_sky :mastodon :pixelfed :nostr])
|
(def source-enum [:enum :pleroma :blue_sky :mastodon :pixelfed :nostr])
|
||||||
(def is-tech-enum [:enum :yes_ai :no :yes_human])
|
|
||||||
(defn valid-source? [source]
|
(defn valid-source? [source]
|
||||||
(m/validate source-enum source))
|
(m/validate source-enum source))
|
||||||
|
|
||||||
(def post-schema [:map
|
|
||||||
[:id :string]
|
|
||||||
[:remoteId :string]])
|
|
||||||
|
|
||||||
(defn get-all-posts-by-source
|
(defn get-all-posts-by-source
|
||||||
([source] (get-all-posts-by-source source [] 1))
|
([source] (get-all-posts-by-source source [] 1))
|
||||||
([source carry page]
|
([source carry page]
|
||||||
|
|
@ -71,11 +67,14 @@
|
||||||
(concat carry rows)
|
(concat carry rows)
|
||||||
(get-all-posts-by-source source (concat carry rows) (inc page))))))
|
(get-all-posts-by-source source (concat carry rows) (inc page))))))
|
||||||
|
|
||||||
(defn get-latest-post-by-source [source]
|
(defn get-latest-post-remote-id-by-source [source]
|
||||||
(let [res-schema
|
(let [res-schema
|
||||||
[:map
|
[:map
|
||||||
[:items
|
[:items
|
||||||
[:vector post-schema]]]]
|
[:vector
|
||||||
|
[:map
|
||||||
|
[:id string?]
|
||||||
|
[:remoteId string?]]]]]]
|
||||||
(when (not (valid-source? source))
|
(when (not (valid-source? source))
|
||||||
(throw (ex-info "Invalid source" {:source source})))
|
(throw (ex-info "Invalid source" {:source source})))
|
||||||
(as->
|
(as->
|
||||||
|
|
@ -85,17 +84,17 @@
|
||||||
"perPage" 1
|
"perPage" 1
|
||||||
:sort "-posted"
|
:sort "-posted"
|
||||||
:filter (str "source = '" (name source) "'")
|
:filter (str "source = '" (name source) "'")
|
||||||
;; :fields (str/join "," ["remoteId" "id"])
|
:fields (str/join "," ["remoteId" "id"])
|
||||||
"skipTotal" true}
|
"skipTotal" true}
|
||||||
:content-type :json
|
:content-type :json
|
||||||
:as :json}) x
|
:as :json}) x
|
||||||
(:body x)
|
(:body x)
|
||||||
(utils/validate-with-throw x res-schema)
|
(if (m/validate res-schema x)
|
||||||
(-> x :items first))))
|
x
|
||||||
|
(do
|
||||||
(defn get-latest-post-remote-id-by-source [source]
|
(m/explain res-schema x)
|
||||||
(tel/log! {:level :info :data {:source source}} "Fetching latest post remote ID for source")
|
(throw (ex-info "Res does not follow schema" {:res x}))))
|
||||||
(:remoteId (get-latest-post-by-source source)))
|
(-> x :items first :remoteId))))
|
||||||
|
|
||||||
(defn post-with-remote-id-already-saved? [remote-id]
|
(defn post-with-remote-id-already-saved? [remote-id]
|
||||||
(->
|
(->
|
||||||
|
|
@ -179,36 +178,24 @@
|
||||||
(let [save-post-schema [:map
|
(let [save-post-schema [:map
|
||||||
[:source source-enum]
|
[:source source-enum]
|
||||||
[:fullPost :any]
|
[:fullPost :any]
|
||||||
[:isTech is-tech-enum]
|
[:isTech :boolean]
|
||||||
[:tags [:sequential :string]]
|
[:tags [:sequential :string]]
|
||||||
[:images [:sequential [:tuple :string :string]]]
|
[:images [:sequential [:tuple :string :string]]]
|
||||||
[:remoteId :string]
|
[:remoteId :string]
|
||||||
[:authorId :string]
|
[:authorId :string]
|
||||||
[:posted :string]]]
|
[:posted :string]]]
|
||||||
|
|
||||||
(utils/validate-with-throw post save-post-schema)
|
(utils/validate-with-throw post save-post-schema)
|
||||||
(tel/log! {:level :info :data {:remoteId (:remoteId post)}} "Post passed save validation")
|
(tel/log! {:level :info :data {:remoteId (:remoteId post)}} "Post passed save validation")
|
||||||
(if (post-with-remote-id-already-saved? (:remoteId post))
|
(if (post-with-remote-id-already-saved? (:remoteId post))
|
||||||
(tel/log! {:level :warn :data {:remoteId (:remoteId post)}} "Post already saved, skipping")
|
(println "post already saved")
|
||||||
(try
|
(http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records")
|
||||||
(let [tag-ids (doall (map get-tag-id (:tags post)))
|
{:headers {"Authorization" (get-login-token-with-cache)}
|
||||||
image-ids (doall (map #(apply get-image-id %) (:images post)))]
|
:form-params (assoc post
|
||||||
(http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records")
|
:tags (map get-tag-id (:tags post))
|
||||||
{:headers {"Authorization" (get-login-token-with-cache)}
|
:images (map #(apply get-image-id %) (:images post)))
|
||||||
:form-params (assoc post
|
:content-type :json
|
||||||
:source (name (:source post))
|
:as :json}))))
|
||||||
:isTech (name (:isTech post))
|
|
||||||
:tags tag-ids
|
|
||||||
:images image-ids)
|
|
||||||
:content-type :json
|
|
||||||
:as :json}))
|
|
||||||
(catch Exception e
|
|
||||||
(tel/log! {:level :error
|
|
||||||
:id :pocket-base/save-post-error
|
|
||||||
:data {:post post
|
|
||||||
:error (.getMessage e)
|
|
||||||
:exception e}}
|
|
||||||
"Error saving post to pocketbase")
|
|
||||||
(throw e))))))
|
|
||||||
|
|
||||||
(defn set-is-tech [post-id is-tech]
|
(defn set-is-tech [post-id is-tech]
|
||||||
(tel/log! {:level :info :data {:post-id post-id}} "Setting post.is-tech to yes_ai")
|
(tel/log! {:level :info :data {:post-id post-id}} "Setting post.is-tech to yes_ai")
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
[malli.core :as m]))
|
[malli.core :as m]))
|
||||||
|
|
||||||
(defn validate-with-throw [value schema]
|
(defn validate-with-throw [value schema]
|
||||||
(tel/log! {:level :debug :data {:value value :schema schema}} "Validating value")
|
|
||||||
(if (m/validate schema value)
|
(if (m/validate schema value)
|
||||||
value
|
value
|
||||||
(do
|
(do
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue