newpipe-ttrss-sync/yt_sub.py

112 lines
3.6 KiB
Python

import requests
import json
import pdb
from urllib.parse import urlparse
from getpass import getpass
from bs4 import BeautifulSoup
BASE_FEED_URL = "https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
TTRSS_API_URL = "https://rss.geekify.de/api/"
def get_channel_feed_url(sub_url):
channel_id = ""
feed_url = ""
url = urlparse(sub_url)
if "/channel" in url.path:
channel_id = url.path.split("/")[-1]
feed_url = BASE_FEED_URL.format(channel_id=channel_id)
elif "/user" in url.path:
req = requests.get(url_str)
soup = BeautifulSoup(req.text, "html.parser")
# we're looking for something like this:
# <link rel="canonical" href="https://www.youtube.com/channel/UC2C_jShtL725hvbm1arSV9w">
channel_link_extracted = soup.find_all("link", rel="canonical")[0].attrs.get("href")
url = urlparse(channel_link_extracted)
channel_id = url.path.split("/")[-1]
feed_url = BASE_FEED_URL.format(channel_id=channel_id)
else:
debug_msg = "Unknown url ({url}) for user \"{name}\""
print(debug_msg.format(url=url_str))
feed_url = ""
return feed_url
class TTRSS():
username = ""
__password = ""
api_endpoint = ""
sid = ""
def __init__(self, api_endpoint="", username="", password=""):
self.username = username
self.api_endpoint = api_endpoint
self.api_level = -1
self.__password = password
self.login()
pass
def login(self):
data = self.__build_request(op="login", user=self.username, password=self.__password)
resp = self.__send_request(data)
self.sid = resp.get("session_id", "")
self.api_level = resp.get("api_level", -1)
if len(self.sid) > 0:
return True
else:
return False
def getCategories(self):
data = self.__build_request(sid=self.sid, op="getCategories", unread_only="f", enable_nested="t", include_empty="f")
resp = self.__send_request(data)
return resp
def subscribeToFeed(self, feed_url, category_id):
data = self.__build_request(sid=self.sid, op="subscribeToFeed", feed_url=feed_url, category_id=category_id)
resp = self.__send_request(data)
return resp
def getFeeds(self, cat_id):
data = self.__build_request(sid=self.sid, op="getFeeds", cat_id=cat_id, unread_only="f")
resp = self.__send_request(data)
return resp
def __build_request(self, *args, **kwargs):
request = kwargs
return request
def __send_request(self, data):
req = requests.post(self.api_endpoint, data=json.dumps(data))
resp = req.json()
if resp.get("status", -1) == 0:
return resp.get("content", {})
else:
print("REQUEST FAILED!")
return {}
print("TinyTinyRSS credentials:")
ttrss = TTRSS(TTRSS_API_URL, input("Username: "), getpass())
cats = ttrss.getCategories()
yt_cat_id_obj = next(filter(lambda x: "YouTube" in x.get("title", ""), cats))
yt_cat_id = yt_cat_id_obj.get("id")
feeds = ttrss.getFeeds(yt_cat_id)
already_subed = []
for feed in feeds:
already_subed.append(feed.get("feed_url"))
with open("subs.json", "r") as f:
newpipe_json = json.loads(f.read())
subs = newpipe_json.get("subscriptions", {})
for sub in subs:
if sub.get("service_id", -1) == 0: # this indicates a youtube link
url_str = sub.get("url", "")
feed_url = get_channel_feed_url(url_str)
if feed_url in already_subed:
print("Already subscribed to: {}".format(sub.get("name")))
else:
print("Subscribing to: {}".format(sub.get("name")), end="")
resp = ttrss.subscribeToFeed(feed_url, yt_cat_id)
if resp.get("status", {}).get("feed_id", -1) > 0:
print("SUCCESS!")
else:
print("FAILED!")