|
|
|
|
@ -23,53 +23,114 @@
|
|
|
|
|
|
|
|
|
|
import os,sys |
|
|
|
|
import requests |
|
|
|
|
import argparse |
|
|
|
|
from datetime import datetime |
|
|
|
|
from pprint import pprint |
|
|
|
|
|
|
|
|
|
from typing import Dict |
|
|
|
|
|
|
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
|
|
|
sys.path.append(current_dir) |
|
|
|
|
from social_common import get_social_data, SocialData, get_env_var |
|
|
|
|
|
|
|
|
|
identifier=get_env_var('BLUESKY_APP_IDENTIFIER') |
|
|
|
|
password=get_env_var('BLUESKY_APP_PWD') |
|
|
|
|
import re |
|
|
|
|
from typing import List, Dict |
|
|
|
|
|
|
|
|
|
def _get_facet(txt:str, slice:str, content:Dict ) -> Dict: |
|
|
|
|
start = txt.index(slice) |
|
|
|
|
end = start + len(slice) |
|
|
|
|
return { |
|
|
|
|
"index": { "byteStart": start, "byteEnd": end }, |
|
|
|
|
"features": [ content ] |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
|
|
parser = argparse.ArgumentParser(description="Send a Discord notification.") |
|
|
|
|
parser.add_argument("app", type=str, help="The application name.") |
|
|
|
|
parser.add_argument("version", type=str, help="The application version.") |
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
|
data = get_social_data(args.app) |
|
|
|
|
if not data: |
|
|
|
|
raise ValueError(f"app: {args.app} is not recognised") |
|
|
|
|
|
|
|
|
|
identifier=get_env_var('BLUESKY_APP_IDENTIFIER') |
|
|
|
|
password=get_env_var('BLUESKY_APP_PWD') |
|
|
|
|
|
|
|
|
|
# Step 1: Authenticate and get access token |
|
|
|
|
auth_response = requests.post( |
|
|
|
|
"https://bsky.social/xrpc/com.atproto.server.createSession", |
|
|
|
|
# Step 1: Authenticate and get access token |
|
|
|
|
auth_response = requests.post( |
|
|
|
|
"https://bsky.social/xrpc/com.atproto.server.createSession", |
|
|
|
|
headers = { |
|
|
|
|
"Content-Type": "application/json" |
|
|
|
|
}, |
|
|
|
|
json={ |
|
|
|
|
"identifier": identifier, |
|
|
|
|
"password": password |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
auth_data = auth_response.json() |
|
|
|
|
# print(auth_data) |
|
|
|
|
access_token = auth_data["accessJwt"] |
|
|
|
|
did = auth_data["did"] |
|
|
|
|
print(f"::add-mask::{did}") |
|
|
|
|
|
|
|
|
|
# Step 2: Post a message |
|
|
|
|
headers = { |
|
|
|
|
"Authorization": f"Bearer {access_token}", |
|
|
|
|
"Content-Type": "application/json" |
|
|
|
|
}, |
|
|
|
|
json={ |
|
|
|
|
"identifier": identifier, |
|
|
|
|
"password": password |
|
|
|
|
} |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
auth_data = auth_response.json() |
|
|
|
|
# print(auth_data) |
|
|
|
|
access_token = auth_data["accessJwt"] |
|
|
|
|
did = auth_data["did"] |
|
|
|
|
|
|
|
|
|
# Step 2: Post a message |
|
|
|
|
headers = { |
|
|
|
|
"Authorization": f"Bearer {access_token}", |
|
|
|
|
"Content-Type": "application/json" |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
post_data = { |
|
|
|
|
"repo": did, |
|
|
|
|
"collection": "app.bsky.feed.post", |
|
|
|
|
"record": { |
|
|
|
|
"$type": "app.bsky.feed.post", |
|
|
|
|
"text": "Hello from my Python bot!", |
|
|
|
|
"createdAt": datetime.now().isoformat() + "Z" |
|
|
|
|
|
|
|
|
|
text = f"{args.app} v{args.version} Released\n\n{data.link}\n\n#pyTermTk #TUI #Python #Linux #terminal" |
|
|
|
|
|
|
|
|
|
post_data = { |
|
|
|
|
"repo": did, |
|
|
|
|
"collection": "app.bsky.feed.post", |
|
|
|
|
"record": { |
|
|
|
|
"$type": "app.bsky.feed.post", |
|
|
|
|
"text":text, |
|
|
|
|
"facets": [ |
|
|
|
|
_get_facet( |
|
|
|
|
text, data.link, |
|
|
|
|
{ "uri": data.link , "$type": "app.bsky.richtext.facet#link" } |
|
|
|
|
), |
|
|
|
|
_get_facet( |
|
|
|
|
text, '#pyTermTk', |
|
|
|
|
{ "tag": 'pyTermTk' , "$type": "app.bsky.richtext.facet#tag" } |
|
|
|
|
), |
|
|
|
|
_get_facet( |
|
|
|
|
text, '#TUI', |
|
|
|
|
{ "tag": 'TUI' , "$type": "app.bsky.richtext.facet#tag" } |
|
|
|
|
), |
|
|
|
|
_get_facet( |
|
|
|
|
text, '#Python', |
|
|
|
|
{ "tag": 'Python' , "$type": "app.bsky.richtext.facet#tag" } |
|
|
|
|
), |
|
|
|
|
_get_facet( |
|
|
|
|
text, '#Linux', |
|
|
|
|
{ "tag": 'Linux' , "$type": "app.bsky.richtext.facet#tag" } |
|
|
|
|
), |
|
|
|
|
_get_facet( |
|
|
|
|
text, '#terminal', |
|
|
|
|
{ "tag": 'terminal' , "$type": "app.bsky.richtext.facet#tag" } |
|
|
|
|
), |
|
|
|
|
], |
|
|
|
|
"createdAt": datetime.now().isoformat() + "Z" |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
post_response = requests.post( |
|
|
|
|
"https://bsky.social/xrpc/com.atproto.repo.createRecord", |
|
|
|
|
headers=headers, |
|
|
|
|
json=post_data |
|
|
|
|
) |
|
|
|
|
print('::group::Data') |
|
|
|
|
pprint(post_data) |
|
|
|
|
print('::endgroup::') |
|
|
|
|
|
|
|
|
|
post_response = requests.post( |
|
|
|
|
"https://bsky.social/xrpc/com.atproto.repo.createRecord", |
|
|
|
|
headers=headers, |
|
|
|
|
json=post_data |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
print(post_response.status_code) |
|
|
|
|
print(post_response.json()) |
|
|
|
|
print(post_response.status_code) |
|
|
|
|
print(post_response.json()) |
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
main() |