diff --git a/bin/mtag/yt-ipr.py b/bin/mtag/yt-ipr.py index 7d656c7f..f1b93f4a 100644 --- a/bin/mtag/yt-ipr.py +++ b/bin/mtag/yt-ipr.py @@ -1,9 +1,13 @@ #!/usr/bin/env python import re +import os import sys import gzip import json +import base64 +import string +import urllib.request from datetime import datetime """ @@ -33,19 +37,30 @@ def main(): txt = "{" + txt.split("{", 1)[1] try: - obj = json.loads(txt) + pd = json.loads(txt) except json.decoder.JSONDecodeError as ex: - obj = json.loads(txt[: ex.pos]) + pd = json.loads(txt[: ex.pos]) - # print(json.dumps(obj, indent=2)) + # print(json.dumps(pd, indent=2)) - vd = obj["videoDetails"] - sd = obj["streamingData"] + if "videoDetails" in pd: + parse_youtube(pd) + else: + parse_freg(pd) + + +def get_expiration(url): + et = re.search(r"[?&]expire=([0-9]+)", url).group(1) + et = datetime.utcfromtimestamp(int(et)) + return et.strftime("%Y-%m-%d, %H:%M") + + +def parse_youtube(pd): + vd = pd["videoDetails"] + sd = pd["streamingData"] et = sd["adaptiveFormats"][0]["url"] - et = re.search(r"[?&]expire=([0-9]+)", et).group(1) - et = datetime.utcfromtimestamp(int(et)) - et = et.strftime("%Y-%m-%d, %H:%M") + et = get_expiration(et) mf = [] if "dashManifestUrl" in sd: @@ -66,9 +81,118 @@ def main(): } print(json.dumps(r)) + freg_conv(pd) + + +def parse_freg(pd): + md = pd["metadata"] + r = { + "yt-id": md["id"], + "yt-title": md["title"], + "yt-author": md["channelName"], + "yt-channel": md["channelURL"].strip("/").split("/")[-1], + "yt-expires": get_expiration(list(pd["video"].values())[0]), + } + print(json.dumps(r)) + + +def freg_conv(pd): + # based on getURLs.js v1.5 (2021-08-07) + # fmt: off + priority = { + "video": [ + 337, 315, 266, 138, # 2160p60 + 313, 336, # 2160p + 308, # 1440p60 + 271, 264, # 1440p + 335, 303, 299, # 1080p60 + 248, 169, 137, # 1080p + 334, 302, 298, # 720p60 + 247, 136 # 720p + ], + "audio": [ + 251, 141, 171, 140, 250, 249, 139 + ] + } + + vid_id = pd["videoDetails"]["videoId"] + chan_id = pd["videoDetails"]["channelId"] + + try: + thumb_url = pd["microformat"]["playerMicroformatRenderer"]["thumbnail"]["thumbnails"][0]["url"] + start_ts = pd["microformat"]["playerMicroformatRenderer"]["liveBroadcastDetails"]["startTimestamp"] + except: + thumb_url = f"https://img.youtube.com/vi/{vid_id}/maxresdefault.jpg" + start_ts = "" + + # fmt: on + + metadata = { + "title": pd["videoDetails"]["title"], + "id": vid_id, + "channelName": pd["videoDetails"]["author"], + "channelURL": "https://www.youtube.com/channel/" + chan_id, + "description": pd["videoDetails"]["shortDescription"], + "thumbnailUrl": thumb_url, + "startTimestamp": start_ts, + } + + if [x for x in vid_id if x not in string.ascii_letters + string.digits + "_-"]: + print(f"malicious json", file=sys.stderr) + return + + basepath = os.path.dirname(sys.argv[1]) + + thumb_fn = f"{basepath}/{vid_id}.jpg" + tmp_fn = f"{thumb_fn}.{os.getpid()}" + if not os.path.exists(thumb_fn) and ( + thumb_url.startswith("https://img.youtube.com/vi/") + or thumb_url.startswith("https://i.ytimg.com/vi/") + ): + try: + with urllib.request.urlopen(thumb_url) as fi: + with open(tmp_fn, "wb") as fo: + fo.write(fi.read()) + + os.rename(tmp_fn, thumb_fn) + except: + if os.path.exists(tmp_fn): + os.unlink(tmp_fn) + + try: + with open(thumb_fn, "rb") as f: + thumb = base64.b64encode(f.read()).decode("ascii") + except: + thumb = "/9j/4AAQSkZJRgABAQEASABIAAD/2wBDAAMCAgICAgMCAgIDAwMDBAYEBAQEBAgGBgUGCQgKCgkICQkKDA8MCgsOCwkJDRENDg8QEBEQCgwSExIQEw8QEBD/yQALCAABAAEBAREA/8wABgAQEAX/2gAIAQEAAD8A0s8g/9k=" + + metadata["thumbnail"] = "data:image/jpeg;base64," + thumb + + ret = { + "metadata": metadata, + "version": "1.5", + "createTime": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"), + } + + for stream, itags in priority.items(): + for itag in itags: + url = None + for afmt in pd["streamingData"]["adaptiveFormats"]: + if itag == afmt["itag"]: + url = afmt["url"] + break + + if url: + ret[stream] = {itag: url} + break + + fn = f"{basepath}/{vid_id}.urls.json" + with open(fn, "w", encoding="utf-8", errors="replace") as f: + f.write(json.dumps(ret, indent=4)) + if __name__ == "__main__": try: main() except: + # raise pass