import os import http.cookiejar import json import re import requests from bs4 import BeautifulSoup from flask import Flask, Response, request, jsonify from diskcache import Cache import logging import threading import time import tempfile import nndownload import boto3 from botocore.client import Config as BotoConfig import urllib.parse from dotenv import load_dotenv load_dotenv() logging.basicConfig( level=logging.INFO, format='%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR) logger = logging.getLogger(__name__) logger.setLevel(os.environ.get('NICONICOGAY_LOG', 'INFO').upper()) app = Flask(__name__) HOST = os.environ.get('NICONICOGAY_HOST', 'https://nicovideo.gay') S3_BUCKET_NAME = os.environ.get('NICONICOGAY_S3_BUCKET_NAME') S3_REGION = os.environ.get('NICONICOGAY_S3_REGION') CDN_BASE_URL = os.environ.get('NICONICOGAY_CDN_BASE_URL') MAX_CONCURRENT_DOWNLOADS = 3 CACHE_EXPIRATION_HTML = 60 * 60 # 1 hour CACHE_EXPIRATION_CDN = 60 * 60 * 24 * 7 # 1 week CACHE_SIZE_LIMIT = 100 * 1024 * 1024 # 100 MB cache = None if os.environ.get('NICONICOGAY_DISABLE_CACHE', '') != '1': cache = Cache("disk_cache", size_limit=CACHE_SIZE_LIMIT) logger.debug("Using disk cache") else: logger.info("Disk cache disabled") user_session = None cookie_jar = http.cookiejar.MozillaCookieJar('cookies.txt') try: cookie_jar.load(ignore_discard=True, ignore_expires=True) user_session = next((cookie.value for cookie in cookie_jar if cookie.name == 'user_session'), None) except FileNotFoundError: logger.info("cookies.txt not found, starting with empty cookie jar") s = requests.Session() s.headers.update({ "User-Agent": os.environ.get('NICONICOGAY_USER_AGENT', 'Twitterbot/1.0') }) s.cookies = cookie_jar # type: ignore s3_client = None if all(key in os.environ for key in [ 'NICONICOGAY_S3_ACCESS_KEY', 'NICONICOGAY_S3_SECRET_KEY', ]): s3_session = boto3.Session() s3_client = s3_session.client( 's3', aws_access_key_id=os.environ['NICONICOGAY_S3_ACCESS_KEY'], aws_secret_access_key=os.environ['NICONICOGAY_S3_SECRET_KEY'], region_name=S3_REGION, endpoint_url=f"https://{S3_REGION}.digitaloceanspaces.com", config=BotoConfig(s3={'addressing_style': 'virtual'}), ) else: logger.info("S3 credentials not provided. Videos will not be downloaded.") download_tracker = { 'active_downloads': 0, 'in_progress': set(), } download_lock = threading.Lock() download_queue = [] def download_and_upload_video(video_id, url, video_quality): try: with download_lock: download_tracker['active_downloads'] += 1 download_tracker['in_progress'].add(video_id) with tempfile.NamedTemporaryFile(suffix='.mp4', delete=True) as temp_file: temp_path = temp_file.name try: logger.info(f"{video_id}: Starting download") nndownload_args = [ "--no-login", "--user-agent", "Googlebot/2.1", "--video-quality", video_quality, "--output-path", temp_path, url ] if user_session: nndownload_args += ["--session-cookie", user_session] nndownload_args = nndownload_args[1:] nndownload.execute(*nndownload_args) if os.path.exists(temp_path) and s3_client and S3_BUCKET_NAME: logger.info(f"{video_id}: Downloaded, uploading to CDN") try: s3_key = f"niconico/{video_id}.mp4" s3_client.upload_file( temp_path, S3_BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'video/mp4', 'ACL': 'public-read'} ) logger.info(f"{video_id}: Upload successful to CDN") if cache is not None: cache.set(f"{video_id}_cdn", True, expire=CACHE_EXPIRATION_CDN) # Clear HTML cache for this video to ensure next view gets updated HTML cache.delete(f"{video_id}_html") logger.debug(f"{video_id}: Cleared HTML cache") return True except Exception as e: logger.error(f"{video_id}: Error uploading to CDN: {e}") return False else: logger.error(f"{video_id}: Failed to download or S3 client not configured") return False finally: if os.path.exists(temp_path): os.unlink(temp_path) logger.debug(f"Removed temporary file: {temp_path}") except Exception as e: logger.error(f"{video_id}: Error in download process: {e}") return False finally: with download_lock: download_tracker['active_downloads'] -= 1 download_tracker['in_progress'].discard(video_id) def download_worker(): while True: try: with download_lock: can_download = download_tracker['active_downloads'] < MAX_CONCURRENT_DOWNLOADS queue_has_items = len(download_queue) > 0 if queue_has_items and can_download: with download_lock: # Get next video that is not already being downloaded for i, (video_id, _, _) in enumerate(download_queue): if video_id not in download_tracker['in_progress']: video_info = download_queue.pop(i) threading.Thread(target=download_and_upload_video, args=(video_info[0], video_info[1], video_info[2])).start() break time.sleep(1) except Exception as e: logger.error(f"Error in download worker: {e}") time.sleep(5) # Back off in case of error worker_thread = threading.Thread(target=download_worker, daemon=True) worker_thread.start() def is_video_in_cdn(video_id): """Check if video exists in CDN""" if cache is not None and cache.get(f"{video_id}_cdn"): logger.debug(f"{video_id}: Already uploaded to CDN (cached)") return True if not s3_client or not S3_BUCKET_NAME: logger.warning("S3 client not configured. Cannot check if video exists in CDN.") return False try: s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=f"niconico/{video_id}.mp4") return True except Exception: return False def is_video_being_downloaded(video_id): """Check if video is currently being downloaded""" with download_lock: return video_id in download_tracker['in_progress'] def get_cdn_url(video_id): """Get the CDN URL for a video""" return f"{CDN_BASE_URL}/niconico/{video_id}.mp4" def get_video_resolution(params): if not params: return None, None video = params['media']['domand']['videos'][0] return video['width'], video['height'] def get_video_quality(params, quality_level_threshold=3): """Get the code of the best video quality available (optionally below a certain threshold)""" videos = params['media']['domand']['videos'] eligible_videos = [v for v in videos if v['qualityLevel'] < quality_level_threshold] if not eligible_videos: return None return str(max(eligible_videos, key=lambda x: int(x['qualityLevel']))['id']) def get_data(video_id, real_url): """Get the server response for a given video ID""" try: logger.debug(f"{video_id}: Fetching content from URL: {real_url}") r = s.get(real_url, timeout=10) # r.raise_for_status() except requests.RequestException as e: logger.error(f"{video_id}: Error fetching the page ('{real_url}'): {e}") return None, None soup = BeautifulSoup(r.text, "html.parser") try: server_response = soup.find("meta", {"name": "server-response"}) if server_response: params = json.loads(server_response["content"])["data"]["response"] # type: ignore return params, soup except (KeyError, json.JSONDecodeError) as e: logger.warning(f"{video_id}: Failed to extract thumbnail info: {e}") pass return None, soup def human_format(num): """Format a number in a human-readable way (e.g., 1K, 2M, etc.)""" if num is None: return None num = float('{:.3g}'.format(num)) magnitude = 0 while abs(num) >= 1000: magnitude += 1 num /= 1000.0 return '{}{}'.format('{:f}'.format(num).rstrip('0').rstrip('.'), ['', 'K', 'M', 'B', 'T'][magnitude]) def get_oembed_url(params): """Get the oEmbed (/owoembed) URL based on the given params (server response)""" if not params: return None author_id = None author_name = None if params.get('owner'): author_id = params['owner'].get('id') author_name = params['owner'].get('nickname') video_id = params.get('video', {}).get('id') if not video_id: return None view_count = human_format(params.get('video', {}).get('count', {}).get('view')) or "n/a" comment_count = human_format(params.get('video', {}).get('count', {}).get('comment')) or "n/a" like_count = human_format(params.get('video', {}).get('count', {}).get('like')) or "n/a" mylist_count = human_format(params.get('video', {}).get('count', {}).get('mylist')) or "n/a" provder_stats = f"👁️ {view_count} 💬 {comment_count} ❤️ {like_count} 📝 {mylist_count}" author_name_encoded = urllib.parse.quote(author_name) if author_name else "" provider_stats_encoded = urllib.parse.quote(provder_stats) oembed_url = ( f"{HOST}/owoembed?" f"author_id={author_id if author_id else ''}&" f"author_name={author_name_encoded}&" f"video_id={video_id}&" f"provider={provider_stats_encoded}" ) return oembed_url @app.route("/watch/") def proxy(video_id): logger.info(f"{video_id}: Received request") cache_html_suffix = "_html" request_user_agent = request.headers.get('User-Agent', '').lower() if 'twitterbot' in request_user_agent: cache_html_suffix = "_html_twitterbot" elif 'discordbot' in request_user_agent: cache_html_suffix = "_html_discordbot" if cache is not None: logger.debug(f"{video_id}: Checking cache") cached_html = cache.get(f"{video_id}{cache_html_suffix}") if cached_html is not None: logger.info(f"{video_id}: Returning cached response") return Response(cached_html, mimetype="text/html") # type: ignore logger.debug(f"{video_id}: Cache miss - fetching") # Not in cache or cache expired; fetch from nicovideo.jp real_url = f"https://www.nicovideo.jp/watch/{video_id}" params, soup = get_data(video_id, real_url) if not params or not soup: logger.error(f"{video_id}: Failed to fetch data") return Response("Video not found", status=404) reason_code = params.get('reasonCode', '').upper() if reason_code in ['HIDDEN_VIDEO', 'ADMINISTRATOR_DELETE_VIDEO', 'RIGHT_HOLDER_DELETE_VIDEO', 'DELETED_VIDEO']: logger.warning(f"{video_id}: Video is hidden or deleted ({reason_code}) - returning 404") return Response("Video not found", status=404) thumbnail_url = ( params["video"]["thumbnail"].get("ogp") or params["video"]["thumbnail"].get("player") or params["video"]["thumbnail"].get("largeUrl") or params["video"]["thumbnail"].get("middleUrl") or params["video"]["thumbnail"].get("url") ) if params else None video_width, video_height = get_video_resolution(params) if params else (None, None) download_allowed = True if download_allowed and 'discordbot' not in request_user_agent: logger.info(f"{video_id}: Video download ignored due to user agent ({request_user_agent})") download_allowed = False if params['video']['duration'] > 60 * 20: # 20 minutes logger.info(f"{video_id}: Video download ignored due to duration ({params['video']['duration']} seconds)") download_allowed = False video_quality = get_video_quality(params) if params else None if download_allowed and video_quality is not None: video_in_cdn = is_video_in_cdn(video_id) video_in_progress = is_video_being_downloaded(video_id) if not video_in_cdn and not video_in_progress and s3_client: with download_lock: # Add to queue if not already in it queue_video_ids = [item[0] for item in download_queue] if video_id not in queue_video_ids: download_queue.append((video_id, real_url, video_quality)) logger.info(f"{video_id}: Queued for download") cdn_video_url = get_cdn_url(video_id) og_tags = soup.find_all("meta", attrs={"property": True}) if len(og_tags) == 0: logger.warning(f"{video_id}: No Open Graph tags found") og_title = None og_description = None og_category = None for tag in og_tags: # Remove attribute(s) added by niconico if 'data-server' in tag.attrs: del tag.attrs['data-server'] # Set title if tag.get("property") == "og:title": og_title = tag["content"] # Set description if tag.get("property") == "og:description": og_description = tag["content"] if og_description and og_title: # The description is formatted like "Title [Category] Description" # Extract category (just incase this is useful later), and keep only the description part. match = re.search(rf"^{re.escape(og_title)}(\s+\[(.*?)\])?\s+(.*)", og_description) if match: og_category = match.group(2) if match.group(2) else None og_description = match.group(3) tag["content"] = og_description # Fix thumbnail if tag.get("property") == "og:image" and thumbnail_url: tag["content"] = thumbnail_url # Fix video URL if tag.get("property") == "og:video:url" or tag.get("property") == "og:video:secure_url": tag["content"] = cdn_video_url og_tags_str = "\n".join(str(tag) for tag in og_tags if tag.get("property") not in ["og:site_name"]) og_tags_str += '\n' og_tags_str += f'\n' # Discord seems to ignore video URLs when Twitter meta tags are present, # so in addition to including these when the User Agent is a Twitterbot, # we also include them when the video is too long to download in order to remove the play button. if 'twitterbot' in request_user_agent or not download_allowed: if 'twitterbot' in request_user_agent: logger.info(f"{video_id}: Twitterbot detected - adding Twitter tags") elif not download_allowed: logger.info(f"{video_id}: Video too long to download - will not show play button") og_tags_str += f'\n' og_tags_str += '\n' og_tags_str += '\n' og_tags_str += f'\n' if og_title: og_tags_str += f'\n' if og_description: og_tags_str += f'\n' # og_tags_str += '\n' # og_tags_str += f'\n' # if video_width: # og_tags_str += f'\n' # if video_height: # og_tags_str += f'\n' html_response = f""" {og_tags_str} """ if cache is not None: logger.info(f"{video_id}: Caching HTML response") cache.set(f"{video_id}{cache_html_suffix}", html_response, expire=CACHE_EXPIRATION_HTML) logger.info(f"{video_id}: Returning response") logger.debug(f"{video_id}: HTML response:\n----------\n{html_response}\n----------") return Response(html_response, mimetype="text/html") @app.route("/owoembed") def owoembed(): """ Handles oEmbed requests with parameters in the URL Returns JSON payload in oEmbed format """ logger.info("Received request for oEmbed endpoint") # Get parameters from query string author_id = request.args.get('author_id', '') author_name = request.args.get('author_name', '') video_id = request.args.get('video_id', '') provider = request.args.get('provider', '') author_name_decoded = urllib.parse.unquote(author_name) provider_decoded = urllib.parse.unquote(provider) # Create the author_url and provider_url author_url = f"https://www.nicovideo.jp/user/{author_id}" video_url = f"https://www.nicovideo.jp/watch/{video_id}" # Create oEmbed response oembed_response = { "author_name": author_name_decoded, "author_url": author_url, "provider_name": provider_decoded, "provider_url": video_url, "title": "Embed", "type": "link", "version": "1.0" } logger.info(f"{video_id}: Returning oEmbed response") logger.debug(f"{video_id}: oEmbed response:\n----------\n{json.dumps(oembed_response, indent=2)}\n----------") return jsonify(oembed_response)