(oEmbed)
This commit is contained in:
MMaker 2025-02-25 17:56:06 -05:00
parent e9eb88c13f
commit e48159ce14
Signed by: mmaker
GPG Key ID: CCE79B8FEDA40FB2

124
app.py
View File

@ -3,7 +3,7 @@ import http.cookiejar
import json
import requests
from bs4 import BeautifulSoup
from flask import Flask, Response
from flask import Flask, Response, request, jsonify
from diskcache import Cache
import logging
@ -13,6 +13,7 @@ import tempfile
import nndownload
import boto3
from botocore.client import Config as BotoConfig
import urllib.parse
from dotenv import load_dotenv
load_dotenv()
@ -21,6 +22,7 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
HOST = os.environ.get('NICONICOGAY_HOST', 'https://nicovideo.gay')
S3_BUCKET_NAME = os.environ.get('NICONICOGAY_S3_BUCKET_NAME')
S3_REGION = os.environ.get('NICONICOGAY_S3_REGION')
CDN_BASE_URL = os.environ.get('NICONICOGAY_CDN_BASE_URL')
@ -179,6 +181,58 @@ def get_video_quality(params, quality_level_threshold=3):
return None
return str(max(eligible_videos, key=lambda x: int(x['qualityLevel']))['id'])
def get_data(video_id, real_url):
"""Get the server response for a given video ID"""
try:
logger.info(f"Fetching content from URL: {real_url}")
r = s.get(real_url, timeout=10)
r.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error fetching the page for video ID '{video_id}': {e}")
return None, None
soup = BeautifulSoup(r.text, "html.parser")
try:
server_response = soup.find("meta", {"name": "server-response"})
if server_response:
params = json.loads(server_response["content"])["data"]["response"] # type: ignore
return params, soup
except (KeyError, json.JSONDecodeError) as e:
logger.warning(f"Failed to extract thumbnail info for video ID '{video_id}': {e}")
pass
return None, soup
def get_oembed_url(params):
"""Get the oEmbed (/owoembed) URL based on the given params (server response)"""
author_id = params.get('owner', {}).get('id')
author_name = params.get('owner', {}).get('nickname')
video_id = params.get('video', {}).get('id')
if not video_id:
return None
view_count = str(params.get('video', {}).get('count', {}).get('view')) or "n/a"
comment_count = str(params.get('video', {}).get('count', {}).get('comment')) or "n/a"
like_count = str(params.get('video', {}).get('count', {}).get('like')) or "n/a"
mylist_count = str(params.get('video', {}).get('count', {}).get('mylist')) or "n/a"
provder_stats = f"👁️ {view_count} 💬 {comment_count} ❤️ {like_count} 📋 {mylist_count}"
author_name_encoded = urllib.parse.quote(author_name)
provider_stats_encoded = urllib.parse.quote(provder_stats)
oembed_url = (
f"{HOST}/owoembed?"
f"author_id={author_id}&"
f"author_name={author_name_encoded}&"
f"video_id={video_id}&"
f"provider={provider_stats_encoded}"
)
return oembed_url
@app.route("/watch/<video_id>")
def proxy(video_id):
logger.info(f"Received request for video ID: {video_id}")
@ -191,30 +245,14 @@ def proxy(video_id):
# Not in cache or cache expired; fetch from nicovideo.jp
real_url = f"https://www.nicovideo.jp/watch/{video_id}"
try:
logger.info(f"Fetching content from URL: {real_url}")
r = s.get(real_url, timeout=10)
r.raise_for_status()
except requests.RequestException as e:
logger.error(f"Error fetching the page for video ID '{video_id}': {e}")
return Response(status=500)
soup = BeautifulSoup(r.text, "html.parser")
thumbnail_url = None
try:
server_response = soup.find("meta", {"name": "server-response"})
if server_response:
params = json.loads(server_response["content"])["data"]["response"] # type: ignore
thumbnail_url = (
params["video"]["thumbnail"].get("ogp") or
params["video"]["thumbnail"].get("player") or
params["video"]["thumbnail"].get("largeUrl") or
params["video"]["thumbnail"].get("middleUrl") or
params["video"]["thumbnail"].get("url")
)
except (KeyError, json.JSONDecodeError) as e:
logger.warning(f"Failed to extract thumbnail info for video ID '{video_id}': {e}")
pass
params, soup = get_data(video_id, real_url)
thumbnail_url = (
params["video"]["thumbnail"].get("ogp") or
params["video"]["thumbnail"].get("player") or
params["video"]["thumbnail"].get("largeUrl") or
params["video"]["thumbnail"].get("middleUrl") or
params["video"]["thumbnail"].get("url")
) if params else None
download_allowed = allow_download(params) if params else False
video_quality = get_video_quality(params) if params else None
@ -254,6 +292,7 @@ def proxy(video_id):
og_tags_str += f'\n<meta content="{og_video_width}" property="twitter:player:width"/>'
if og_video_height:
og_tags_str += f'\n<meta content="{og_video_height}" property="twitter:player:height"/>'
og_tags_str += f'\n<link rel="alternate" href="{get_oembed_url(params)}" type="application/json+oembed" title="{video_id}"/>'
html_response = f"""<!DOCTYPE html>
<!--
niconico proxy - brought to you by https://mmaker.moe
@ -271,3 +310,38 @@ if you want to download videos, please consider using a tool like nndownload: ht
cache.set(video_id, html_response, expire=CACHE_EXPIRATION_SECONDS)
return Response(html_response, mimetype="text/html")
@app.route("/owoembed")
def owoembed():
"""
Handles oEmbed requests with parameters in the URL
Returns JSON payload in oEmbed format
"""
logger.info("Received request for oEmbed endpoint")
# Get parameters from query string
author_id = request.args.get('author_id', '')
author_name = request.args.get('author_name', '')
video_id = request.args.get('video_id', '')
provider = request.args.get('provider', '')
author_name_decoded = urllib.parse.unquote(author_name)
provider_decoded = urllib.parse.unquote(provider)
# Create the author_url and provider_url
author_url = f"https://www.nicovideo.jp/user/{author_id}"
video_url = f"https://www.nicovideo.jp/watch/{video_id}"
# Create oEmbed response
oembed_response = {
"author_name": author_name_decoded,
"author_url": author_url,
"provider_name": provider_decoded,
"provider_url": video_url,
"title": "Embed",
"type": "link",
"version": "1.0"
}
logger.info(f"Returning oEmbed response for video ID: {video_id}")
return jsonify(oembed_response)