import os import http.cookiejar import json import requests from bs4 import BeautifulSoup from flask import Flask, Response from diskcache import Cache import logging import threading import time import tempfile import nndownload import boto3 from botocore.client import Config as BotoConfig from dotenv import load_dotenv load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) S3_BUCKET_NAME = os.environ.get('NICONICOGAY_S3_BUCKET_NAME') S3_REGION = os.environ.get('NICONICOGAY_S3_REGION') CDN_BASE_URL = os.environ.get('NICONICOGAY_CDN_BASE_URL') MAX_CONCURRENT_DOWNLOADS = 3 CACHE_EXPIRATION_SECONDS = 3600 # 1 hour CACHE_SIZE_LIMIT = 100 * 1024 * 1024 # 100 MB cache = None if os.environ.get('NICONICOGAY_DISABLE_CACHE', '') != '' else Cache("disk_cache", size_limit=CACHE_SIZE_LIMIT) cookie_jar = http.cookiejar.MozillaCookieJar('cookies.txt') try: cookie_jar.load(ignore_discard=True, ignore_expires=True) except FileNotFoundError: logger.warning("cookies.txt not found, starting with empty cookie jar") s = requests.Session() s.headers.update({ "User-Agent": os.environ.get('NICONICOGAY_USER_AGENT', 'Twitterbot/1.0') }) s.cookies = cookie_jar # type: ignore if all(key in os.environ for key in [ 'NICONICOGAY_S3_ACCESS_KEY', 'NICONICOGAY_S3_SECRET_KEY', ]): s3_session = boto3.Session() s3_client = s3_session.client( 's3', aws_access_key_id=os.environ['NICONICOGAY_S3_ACCESS_KEY'], aws_secret_access_key=os.environ['NICONICOGAY_S3_SECRET_KEY'], region_name=S3_REGION, endpoint_url=f"https://{S3_REGION}.digitaloceanspaces.com", config=BotoConfig(s3={'addressing_style': 'virtual'}), ) else: logger.warning("S3 credentials not provided, exiting") exit(1) download_tracker = { 'active_downloads': 0, 'in_progress': set(), } download_lock = threading.Lock() download_queue = [] def download_and_upload_video(video_id, url, video_quality): try: with download_lock: download_tracker['active_downloads'] += 1 download_tracker['in_progress'].add(video_id) with tempfile.NamedTemporaryFile(suffix='.mp4', delete=True) as temp_file: temp_path = temp_file.name try: logger.info(f"Starting download for video ID: {video_id}") nndownload.execute( "--no-login", "--user-agent", "Googlebot/2.1", "--video-quality", video_quality, "--output-path", temp_path, url ) if os.path.exists(temp_path) and s3_client: logger.info(f"Downloaded video {video_id}, uploading to CDN") try: s3_key = f"niconico/{video_id}.mp4" s3_client.upload_file( temp_path, S3_BUCKET_NAME, s3_key, ExtraArgs={'ContentType': 'video/mp4', 'ACL': 'public-read'} ) logger.info(f"Successfully uploaded video {video_id} to CDN") # Clear cache for this video to ensure next view gets updated HTML if cache: cache.delete(video_id) logger.info(f"Cleared cache for video ID: {video_id}") return True except Exception as e: logger.error(f"Error uploading video {video_id} to CDN: {e}") return False else: logger.error(f"Failed to download video {video_id} or S3 client not configured") return False finally: if os.path.exists(temp_path): os.unlink(temp_path) logger.info(f"Removed temporary file: {temp_path}") except Exception as e: logger.error(f"Error in download process for video {video_id}: {e}") return False finally: with download_lock: download_tracker['active_downloads'] -= 1 download_tracker['in_progress'].discard(video_id) def download_worker(): while True: try: with download_lock: can_download = download_tracker['active_downloads'] < MAX_CONCURRENT_DOWNLOADS queue_has_items = len(download_queue) > 0 if queue_has_items and can_download: with download_lock: # Get next video that is not already being downloaded for i, (video_id, _, _) in enumerate(download_queue): if video_id not in download_tracker['in_progress']: video_info = download_queue.pop(i) threading.Thread(target=download_and_upload_video, args=(video_info[0], video_info[1], video_info[2])).start() break time.sleep(1) except Exception as e: logger.error(f"Error in download worker: {e}") time.sleep(5) # Back off in case of error worker_thread = threading.Thread(target=download_worker, daemon=True) worker_thread.start() def is_video_in_cdn(video_id): """Check if video exists in CDN""" if not s3_client: return False try: s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=f"niconico/{video_id}.mp4") return True except Exception: return False def is_video_being_downloaded(video_id): """Check if video is currently being downloaded""" with download_lock: return video_id in download_tracker['in_progress'] def get_cdn_url(video_id): """Get the CDN URL for a video""" return f"{CDN_BASE_URL}/niconico/{video_id}.mp4" def allow_download(params): if params['video']['duration'] > 60 * 15: return False return True def get_video_quality(params, quality_level_threshold=3): """Get the code of the best video quality available (optionally below a certain threshold)""" videos = params['media']['domand']['videos'] eligible_videos = [v for v in videos if v['qualityLevel'] < quality_level_threshold] if not eligible_videos: return None return str(max(eligible_videos, key=lambda x: int(x['qualityLevel']))['id']) @app.route("/watch/") def proxy(video_id): logger.info(f"Received request for video ID: {video_id}") if cache: cached_html = cache.get(video_id) if cached_html is not None: logger.info(f"Using cached response for video ID: {video_id}") return Response(cached_html, mimetype="text/html") # type: ignore # Not in cache or cache expired; fetch from nicovideo.jp real_url = f"https://www.nicovideo.jp/watch/{video_id}" try: logger.info(f"Fetching content from URL: {real_url}") r = s.get(real_url, timeout=10) r.raise_for_status() except requests.RequestException as e: logger.error(f"Error fetching the page for video ID '{video_id}': {e}") return Response(status=500) soup = BeautifulSoup(r.text, "html.parser") thumbnail_url = None try: server_response = soup.find("meta", {"name": "server-response"}) if server_response: params = json.loads(server_response["content"])["data"]["response"] # type: ignore thumbnail_url = ( params["video"]["thumbnail"].get("ogp") or params["video"]["thumbnail"].get("player") or params["video"]["thumbnail"].get("largeUrl") or params["video"]["thumbnail"].get("middleUrl") or params["video"]["thumbnail"].get("url") ) except (KeyError, json.JSONDecodeError) as e: logger.warning(f"Failed to extract thumbnail info for video ID '{video_id}': {e}") pass download_allowed = allow_download(params) if params else False video_quality = get_video_quality(params) if params else None if download_allowed and video_quality is not None: video_in_cdn = is_video_in_cdn(video_id) video_in_progress = is_video_being_downloaded(video_id) if not video_in_cdn and not video_in_progress and s3_client: with download_lock: # Add to queue if not already in it queue_video_ids = [item[0] for item in download_queue] if video_id not in queue_video_ids: download_queue.append((video_id, real_url, video_quality)) logger.info(f"Queued video ID {video_id} for download") cdn_video_url = get_cdn_url(video_id) og_tags = soup.find_all("meta", property=lambda x: x) # type: ignore for tag in og_tags: # Fix thumbnail if tag.get("property") == "og:image" and thumbnail_url: tag["content"] = thumbnail_url # Fix video URL if tag.get("property") == "og:video:url" or tag.get("property") == "og:video:secure_url": tag["content"] = cdn_video_url og_tags_str = "\n".join(str(tag) for tag in og_tags) html_response = f""" {og_tags_str} """ if cache: logging.info(f"Caching response for video ID: {video_id}") cache.set(video_id, html_response, expire=CACHE_EXPIRATION_SECONDS) return Response(html_response, mimetype="text/html")