diff --git a/deps.edn b/deps.edn index 0a35e3d..47e2f36 100644 --- a/deps.edn +++ b/deps.edn @@ -15,7 +15,9 @@ ;; json cheshire/cheshire {:mvn/version "6.0.0"} ;; metosin/muuntaja {:mvn/version "0.6.11"} - org.clojure/clojure {:mvn/version "1.12.1"}} + org.clojure/clojure {:mvn/version "1.12.1"} + ;; websockets + hato/hato {:mvn/version "1.0.0"}} :aliases {;; Run with clj -T:build function-in-build diff --git a/src/micro_blog/blue_sky.clj b/src/micro_blog/blue_sky.clj index 1c9579c..d491dac 100644 --- a/src/micro_blog/blue_sky.clj +++ b/src/micro_blog/blue_sky.clj @@ -3,6 +3,7 @@ [clj-http.client :as http-client] [micro-blog.pocket-base :as pb] [micro-blog.utils :as utils] + [cheshire.core :as json] [micro-blog.is-tech] [taoensso.telemere :as tel] [micro-blog.config :refer [config]])) @@ -82,10 +83,11 @@ (map #(vector (:fullsize %) (:alt %)) images))) (defn transform-post [post] + (tel/log! {:level :info :data {:post post}} "Transforming post") (hash-map :source :blue_sky :fullPost post :remoteId (:cid post) - :isTech (micro-blog.is-tech/is-tech? (:record post)) + :isTech (micro-blog.is-tech/is-tech? (json/generate-string (:record post))) :authorId (get-in post [:author :handle]) :tags (extract-tags post) :images (extract-images post) diff --git a/src/micro_blog/config.clj b/src/micro_blog/config.clj index c12eaef..ad9372b 100644 --- a/src/micro_blog/config.clj +++ b/src/micro_blog/config.clj @@ -8,7 +8,11 @@ :mastodon-host "MASTODON_BASE_URL" :mastodon-account-id "MASTODON_ACCOUNT_ID" :api-host "API_HOST" - :api-port "API_PORT"}) + :api-port "API_PORT" + ;; :nostr-fetcher-npub "NOSTR_FETCHER_NPUB" + :nostr-fetcher-id "NOSTR_FETCHER_PUB_HEX" + :nostr-id "NOSTR_ID" + :nostr-relay "NOSTR_RELAY"}) (defn- load-config [] (merge (read-string (slurp "config.edn")) diff --git a/src/micro_blog/is_tech.clj b/src/micro_blog/is_tech.clj index 6a33047..49eb22d 100644 --- a/src/micro_blog/is_tech.clj +++ b/src/micro_blog/is_tech.clj @@ -5,6 +5,7 @@ [clj-http.client :as client])) (defn is-tech? [post-text] + (when (not (string? post-text)) (throw (ex-info "Post text must be a string" {:post-text post-text}))) (let [url (str (:mistral-host @config) "/v1/conversations") headers {"Content-Type" "application/json" "Accept" "application/json" @@ -12,7 +13,7 @@ body {:inputs post-text :stream false :agent_id (@config :mistral-agent-id)}] - (tel/log! {:level :info :data {:url url :agent_id (:agent_id body)}} "making request to mistral agent") + (tel/log! {:level :info :data {:url url :agent_id (:agent_id body) :post-text post-text}} "making request to mistral agent") (-> (client/post url {:headers headers :form-params body diff --git a/src/micro_blog/main.clj b/src/micro_blog/main.clj index cb84b05..c4ec6b9 100644 --- a/src/micro_blog/main.clj +++ b/src/micro_blog/main.clj @@ -5,6 +5,7 @@ [taoensso.telemere :as tel] [micro-blog.logging.main :as logging] micro-blog.api + [micro-blog.nostr :as nostr] [micro-blog.blue-sky :as blue-sky] [micro-blog.mastodon :as masto])) @@ -15,6 +16,8 @@ (logging/setup-logging) (tel/log! :info "Setting up API") (micro-blog.api/start) + (tel/log! :info "Setting up nostr scraper") + (nostr/start) (tel/log! :info "Setting up crons") (doseq [[i cron] (map-indexed vector crons)] (let [start (.plus (Instant/now) (Duration/ofMinutes (* i 5)))] diff --git a/src/micro_blog/mastodon.clj b/src/micro_blog/mastodon.clj index 8b7f027..24b7d0d 100644 --- a/src/micro_blog/mastodon.clj +++ b/src/micro_blog/mastodon.clj @@ -17,7 +17,7 @@ [:media_attachments [:vector [:map [:url :string] [:type [:= "image"]] - [:description :string]]]]]]) + [:description [:maybe :string]]]]]]]) (defn get-posts-until-id [id] (let [limit 10 @@ -50,7 +50,7 @@ :remoteId (:id raw-post) :authorId (get-in raw-post [:account :id]) :tags (map :name (:tags raw-post)) - :images (map (fn [img] [(:url img) (:description img)]) (:media_attachments raw-post)) + :images (map (fn [img] [(:url img) (or (:description img) "")]) (:media_attachments raw-post)) :posted (:created_at raw-post))) (defn save-post [post] diff --git a/src/micro_blog/nostr.clj b/src/micro_blog/nostr.clj new file mode 100644 index 0000000..1aecdc5 --- /dev/null +++ b/src/micro_blog/nostr.clj @@ -0,0 +1,76 @@ +(ns micro-blog.nostr + (:require + [micro-blog.pocket-base :as pb] + [micro-blog.is-tech] + [taoensso.telemere :as tel] + [hato.websocket :as ws] + [cheshire.core :as json] + [clojure.string :as str] + [micro-blog.config :refer [config]]) + (:import + [java.time Instant OffsetDateTime ZoneOffset] + [java.time.format DateTimeFormatter])) + +(defn pb-date-to-unix-timestamp-seconds [date-str] + (-> date-str + (str/replace " " "T") + (Instant/parse) + (.getEpochSecond))) + +(defn nostr-date-to-pb [ts] + (let [instant (java.time.Instant/ofEpochSecond ts) + formatter (java.time.format.DateTimeFormatter/ofPattern "yyyy-MM-dd HH:mm:ss.SSSX") + zoned (.atZone instant java.time.ZoneOffset/UTC)] + (.format formatter zoned))) + +(defn last-post-timestamp [] + (pb-date-to-unix-timestamp-seconds (:posted (pb/get-latest-post-by-source :nostr)))) + +(defn transform-post [post] + (tel/log! {:level :info :data {:post post}} "Transforming nostr post") + (hash-map :source :nostr + :fullPost post + :remoteId (get post "id") + :isTech (micro-blog.is-tech/is-tech? (get post "content")) + :authorId (get post "pubkey") + :tags (reduce (fn [acc tag] + (let [tag-type (first tag) + tag-value (second tag)] + (if (= tag-type "t") + (conj acc tag-value) + acc))) [] (get post "tags")) + :images [] + :posted (nostr-date-to-pb (get post "created_at")))) + +(defn process-msg [raw-msg] + (let [msg (json/parse-string (.toString raw-msg))] + (tel/log! {:level :info :data {:msg msg}} "Processing nostr message") + (let [msg-type (first msg) ;; ex: EVENT + event (nth msg 2)] + (when (and (= (get event "kind") 1) (= msg-type "EVENT")) + (-> event + transform-post + pb/save-post))))) + +(def socket (atom nil)) +(defn connect [] + (tel/log! :info "Opening websocket connection to nostr relay") + (reset! socket @(ws/websocket (@config :nostr-relay) + {:headers {"User-Agent" "micro-blog-fetcher"} + :on-message (fn [_ws msg _last?] + (process-msg msg)) + :on-close (fn [_ws status reason] + (tel/log! {:level :warn :data {:status status :reason reason}} "WebSocket connection closed"))}))) + +(defn subscribe-to-author [pubkey since] + (let [sub-id (@config :nostr-fetcher-id) + filter {:kinds [1] :authors [pubkey] :since since} + msg (json/generate-string ["REQ" sub-id filter])] + (.get (ws/send! @socket msg)))) + +(defn close [] + (ws/close! @socket)) + +(defn start [] + (connect) + (subscribe-to-author (@config :nostr-id) (last-post-timestamp))) diff --git a/src/micro_blog/pocket_base.clj b/src/micro_blog/pocket_base.clj index a4dc520..515ac85 100644 --- a/src/micro_blog/pocket_base.clj +++ b/src/micro_blog/pocket_base.clj @@ -1,6 +1,5 @@ (ns micro-blog.pocket-base (:require - [clojure.pprint :refer [pprint]] [clojure.string :as str] [clj-http.client :as http-client] [malli.core :as m] @@ -44,6 +43,10 @@ (defn valid-source? [source] (m/validate source-enum source)) +(def post-schema [:map + [:id :string] + [:remoteId :string]]) + (defn get-all-posts-by-source ([source] (get-all-posts-by-source source [] 1)) ([source carry page] @@ -68,14 +71,11 @@ (concat carry rows) (get-all-posts-by-source source (concat carry rows) (inc page)))))) -(defn get-latest-post-remote-id-by-source [source] +(defn get-latest-post-by-source [source] (let [res-schema [:map [:items - [:vector - [:map - [:id string?] - [:remoteId string?]]]]]] + [:vector post-schema]]]] (when (not (valid-source? source)) (throw (ex-info "Invalid source" {:source source}))) (as-> @@ -85,17 +85,17 @@ "perPage" 1 :sort "-posted" :filter (str "source = '" (name source) "'") - :fields (str/join "," ["remoteId" "id"]) + ;; :fields (str/join "," ["remoteId" "id"]) "skipTotal" true} :content-type :json :as :json}) x (:body x) - (if (m/validate res-schema x) - x - (do - (m/explain res-schema x) - (throw (ex-info "Res does not follow schema" {:res x})))) - (-> x :items first :remoteId)))) + (utils/validate-with-throw x res-schema) + (-> x :items first)))) + +(defn get-latest-post-remote-id-by-source [source] + (tel/log! {:level :info :data {:source source}} "Fetching latest post remote ID for source") + (:remoteId (get-latest-post-by-source source))) (defn post-with-remote-id-already-saved? [remote-id] (-> @@ -185,11 +185,10 @@ [:remoteId :string] [:authorId :string] [:posted :string]]] - (utils/validate-with-throw post save-post-schema) (tel/log! {:level :info :data {:remoteId (:remoteId post)}} "Post passed save validation") (if (post-with-remote-id-already-saved? (:remoteId post)) - (println "post already saved") + (tel/log! {:level :warn :data {:remoteId (:remoteId post)}} "Post already saved, skipping") (try (http-client/post (str (@config :pocket-base-host) "/api/collections/micro_blog_posts/records") {:headers {"Authorization" (get-login-token-with-cache)} diff --git a/src/micro_blog/utils.clj b/src/micro_blog/utils.clj index be84ba5..931028d 100644 --- a/src/micro_blog/utils.clj +++ b/src/micro_blog/utils.clj @@ -4,6 +4,7 @@ [malli.core :as m])) (defn validate-with-throw [value schema] + (tel/log! {:level :debug :data {:value value :schema schema}} "Validating value") (if (m/validate schema value) value (do