Files
JioDownloader/app.py
sam a0ed0b4a4d
Some checks failed
Docker Deploy / build-and-deploy (push) Failing after 35s
adjust
2026-02-06 17:25:41 +00:00

490 lines
20 KiB
Python

import streamlit as st
import requests
import os
import base64
import html
import re
from Crypto.Cipher import DES
from Crypto.Util.Padding import unpad
import base64
import zlib
from mutagen.mp4 import MP4, MP4Cover
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, APIC
from mutagen import File
# from jiosaavn import JioSaavn
from jiosaavn import album,playlist
# --- CONFIGURATION ---
DOWNLOAD_DIR = os.path.join(os.getcwd(), "downloads")
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
st.set_page_config(page_title="JioSaavn Downloader", page_icon="🎵", layout="wide")
# --- SESSION STATE SETUP ---
if 'search_results' not in st.session_state:
st.session_state.search_results = None
if 'link_result' not in st.session_state:
st.session_state.link_result = None # Store result from direct link
if 'selected_songs' not in st.session_state:
st.session_state.selected_songs = []
if 'selected_albums' not in st.session_state:
st.session_state.selected_albums = []
if 'selected_playlists' not in st.session_state:
st.session_state.selected_playlists = []
# --- DECRYPTION ENGINE ---
def decrypt_url(encrypted_url):
try:
if not encrypted_url: return None
enc_data = base64.b64decode(encrypted_url)
key = b"38346591"
cipher = DES.new(key, DES.MODE_ECB)
decrypted_data = cipher.decrypt(enc_data)
clean_data = unpad(decrypted_data, DES.block_size)
return clean_data.decode('utf-8')
except Exception as e:
return None
# --- METADATA UPDATER ---
def add_metadata(filepath, song_details):
"""
Adds metadata to audio file (supports both .m4a and .mp3)
"""
try:
if not os.path.exists(filepath):
return False
# Extract metadata from song details
title = html.unescape(song_details.get('title') or song_details.get('song') or '')
artist = html.unescape(song_details.get('primary_artists') or song_details.get('singers') or '')
album_name = html.unescape(song_details.get('album') or '')
year = song_details.get('year') or ''
image_url = song_details.get('image', '').replace('50x50', '500x500').replace('150x150', '500x500')
# Download album artwork
artwork_data = None
if image_url:
try:
img_response = requests.get(image_url, timeout=5)
if img_response.status_code == 200:
artwork_data = img_response.content
except:
pass
# Load the audio file
audio = File(filepath)
if filepath.endswith('.m4a') or filepath.endswith('.mp4'):
# Handle M4A/MP4 files
if audio is None:
audio = MP4(filepath)
audio['\xa9nam'] = title # Title
audio['\xa9ART'] = artist # Artist
audio['\xa9alb'] = album_name # Album
if year:
audio['\xa9day'] = str(year) # Year
# Add artwork
if artwork_data:
audio['covr'] = [MP4Cover(artwork_data, imageformat=MP4Cover.FORMAT_JPEG)]
audio.save()
elif filepath.endswith('.mp3'):
# Handle MP3 files
try:
audio = ID3(filepath)
except:
audio = ID3()
audio.add(TIT2(encoding=3, text=title)) # Title
audio.add(TPE1(encoding=3, text=artist)) # Artist
audio.add(TALB(encoding=3, text=album_name)) # Album
if year:
audio.add(TDRC(encoding=3, text=str(year))) # Year
# Add artwork
if artwork_data:
audio.add(APIC(
encoding=3,
mime='image/jpeg',
type=3, # Cover (front)
desc='Cover',
data=artwork_data
))
audio.save(filepath)
return True
except Exception as e:
print(f"Error adding metadata: {e}")
return False
# --- SMART DOWNLOADER ---
def smart_download(decrypted_url, filename, status_box, folder=DOWNLOAD_DIR, song_details=None):
"""Tries 320kbps -> 160kbps -> 96kbps and adds metadata"""
if not decrypted_url: return False
if not os.path.exists(folder):
os.makedirs(folder)
ext = ".mp4"
if ".m4a" in decrypted_url: ext = ".m4a"
base_url = decrypted_url.replace("_96" + ext, "").replace("_160" + ext, "").replace("_320" + ext, "")
qualities = [("_320", "320kbps"), ("_160", "160kbps"), ("_96", "96kbps")]
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
for q_tag, label in qualities:
final_url = f"{base_url}{q_tag}{ext}"
try:
head = requests.head(final_url, headers=headers, timeout=2)
if head.status_code == 200:
filepath = os.path.join(folder, filename)
with requests.get(final_url, stream=True, headers=headers) as r:
r.raise_for_status()
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# Add metadata after successful download
if song_details:
add_metadata(filepath, song_details)
return True
except:
continue
return False
# --- API CLIENT ---
class JioSaavnAPI:
def __init__(self):
self.base_url = "https://www.jiosaavn.com/api.php"
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36"}
def _get(self, params):
params.update({"_format": "json", "_marker": "0", "ctx": "web6dot0"})
try:
res = requests.get(self.base_url, params=params, headers=self.headers)
if res.status_code == 200: return res.json()
except: return None
return None
def search_all(self, query):
return self._get({"__call": "autocomplete.get", "query": query})
def get_song_details(self, id):
d = self._get({"__call": "song.getDetails", "pids": id})
if 'songs' in d: d = d['songs']
if isinstance(d, list): return d[0]
print('=== Song details ===\n',d)
return d
# NEW: Fetch song details using the Token from the URL (not the PID)
def get_song_by_token(self, token):
d = self._get({"__call": "webapi.get", "token": token, "type": "song"})
# webapi.get usually returns the 'songs' list directly or a dict
if isinstance(d, dict) and 'songs' in d:
return d['songs'][0]
if isinstance(d, list) and len(d) > 0:
return d[0]
return d
def get_album_details(self, id):
return self._get({"__call": "content.getAlbumDetails", "albumid": id})
def get_album_details_link(self,id):
# print(f"Decoding album id from link token: {id}")
albumid = album(url=id)['albumId']
# print(f"Decoded album id: {albumid}")
return self._get({"__call": "content.getAlbumDetails", "albumid": albumid})
def get_playlist_details(self, id):
print(f"Fetching playlist id: {id}")
return self._get({"__call": "playlist.getDetails", "listid": id})
def get_playlist_details_link(self,id):
print(f"Decoding playlist id from link token: {id}")
listid = playlist(url=id)
print(f"Decoded playlist id: {listid}")
return self._get({"__call": "playlist.getDetails", "listid": listid['id']})
api = JioSaavnAPI()
# --- URL PARSER ---
def process_jiosaavn_link(url):
"""
Identifies if the URL is a Song, Album, or Playlist and extracts the token.
Returns: (type_string, token_string) or (None, None)
"""
try:
if "jiosaavn.com/song/" in url:
# Pattern: .../song/name/TOKEN
token = url.split("/")[-1]
return "song", token
elif "jiosaavn.com/album/" in url:
# Pattern: .../album/name/TOKEN
token = url.split("/")[-1]
return "album", token
elif "jiosaavn.com/featured/" in url or "jiosaavn.com/s/playlist/" in url:
# Pattern: .../featured/name/TOKEN
token = url.split("/")[-1]
return "playlist", token
return None, None
except:
return None, None
# --- UI LOGIC ---
st.title("🎵 JioSaavn Downloader")
# 1. Search / Link Input Section
with st.container():
col1, col2 = st.columns([4, 1])
with col1:
query_input = st.text_input("Search Query OR Paste JioSaavn Link", key="search_box", placeholder="e.g. 'Arijit Singh' or 'https://www.jiosaavn.com/song/...'")
with col2:
st.write("")
st.write("")
if st.button("🚀 Go", use_container_width=True):
if query_input:
# Reset previous states
st.session_state.search_results = None
st.session_state.link_result = None
st.session_state.selected_songs = []
st.session_state.selected_albums = []
st.session_state.selected_playlists = []
# Check if it's a URL
link_type, token = process_jiosaavn_link(query_input)
if link_type and token:
with st.spinner(f"Fetching {link_type} details..."):
if link_type == "song":
# For songs, we fetch details immediately and put into a list wrapper
data = api.get_song_by_token(token)
if data:
st.session_state.link_result = {"type": "song", "data": [data]}
elif link_type == "album":
data = api.get_album_details_link(query_input)
# print('='*80,'Album data:', data)
if data:
st.session_state.link_result = {"type": "album", "data": [data]}
elif link_type == "playlist":
data = api.get_playlist_details_link(query_input)
if data:
st.session_state.link_result = {"type": "playlist", "data": [data]}
else:
# It's a normal search text
with st.spinner("Searching..."):
st.session_state.search_results = api.search_all(query_input)
# 2. Display Results
# A. LINK RESULT (Direct Link)
if st.session_state.link_result:
res = st.session_state.link_result
st.markdown(f"### 🔗 Link Detected: {res['type'].capitalize()}")
data_list = res['data']
for item in data_list:
with st.container():
c1, c2, c3 = st.columns([1, 5, 1])
with c1:
img = item.get('image', '').replace('50x50', '150x150')
if img:
st.image(img, width=80)
with c2:
title = html.unescape(item.get('title') or item.get('song') or item.get('listname') or "Unknown")
st.markdown(f"**{title}**")
desc = html.unescape(item.get('description') or item.get('subtitle') or "")
st.caption(desc)
with c3:
# Auto-select logic for links
# We use a button that acts as a toggle or just pre-add them
if res['type'] == 'song':
if item not in st.session_state.selected_songs:
st.session_state.selected_songs.append(item)
st.success("Ready")
elif res['type'] == 'album':
if item not in st.session_state.selected_albums:
st.session_state.selected_albums.append(item)
st.success("Ready")
elif res['type'] == 'playlist':
if item not in st.session_state.selected_playlists:
st.session_state.selected_playlists.append(item)
st.success("Ready")
st.divider()
# B. SEARCH RESULTS (Normal Search)
elif st.session_state.search_results:
res = st.session_state.search_results
tab1, tab2, tab3 = st.tabs(["🎵 Songs", "💿 Albums", "Cc Playlists"])
# --- SONGS TAB ---
with tab1:
songs = res.get("songs", {}).get("data", [])
if not songs: st.info("No songs found.")
for song in songs:
with st.container():
c1, c2, c3 = st.columns([1, 5, 1])
with c1:
img = song.get('image', '').replace('50x50', '150x150')
if img:
st.image(img, width=60)
with c2:
st.markdown(f"**{html.unescape(song['title'])}**")
st.caption(html.unescape(song['description']))
with c3:
if st.checkbox("Select", key=f"s_{song['id']}"):
if song not in st.session_state.selected_songs:
st.session_state.selected_songs.append(song)
else:
if song in st.session_state.selected_songs:
st.session_state.selected_songs.remove(song)
st.divider()
# --- ALBUMS TAB ---
with tab2:
albums = res.get("albums", {}).get("data", [])
# print('=== Albums ===\n', albums)
if not albums: st.info("No albums found.")
for album in albums:
with st.container():
c1, c2, c3 = st.columns([1, 5, 1])
with c1:
img = album.get('image', '').replace('50x50', '150x150')
if img:
st.image(img, width=60)
with c2:
st.markdown(f"**{html.unescape(album['title'])}**")
st.caption(html.unescape(album['description']))
with c3:
if st.checkbox("Select", key=f"a_{album['id']}"):
if album not in st.session_state.selected_albums:
st.session_state.selected_albums.append(album)
else:
if album in st.session_state.selected_albums:
st.session_state.selected_albums.remove(album)
st.divider()
# --- PLAYLISTS TAB ---
with tab3:
playlists = res.get("playlists", {}).get("data", [])
if not playlists: st.info("No playlists found.")
for pl in playlists:
with st.container():
c1, c2, c3 = st.columns([1, 5, 1])
with c1:
img = pl.get('image', '').replace('50x50', '150x150')
if img:
st.image(img, width=60)
with c2:
st.markdown(f"**{html.unescape(pl['title'])}**")
st.caption(html.unescape(pl['description']))
with c3:
if st.checkbox("Select", key=f"p_{pl['id']}"):
if pl not in st.session_state.selected_playlists:
st.session_state.selected_playlists.append(pl)
else:
if pl in st.session_state.selected_playlists:
st.session_state.selected_playlists.remove(pl)
st.divider()
# 3. Download Action Section
st.markdown("### 📥 Download Queue")
total_items = len(st.session_state.selected_songs) + len(st.session_state.selected_albums) + len(st.session_state.selected_playlists)
if total_items > 0:
st.success(f"Selected: {len(st.session_state.selected_songs)} Songs, {len(st.session_state.selected_albums)} Albums, {len(st.session_state.selected_playlists)} Playlists")
if st.button(f"Start Download ({total_items})", type="primary"):
status_text = st.empty()
progress_bar = st.progress(0)
# --- PROCESS INDIVIDUAL SONGS ---
for i, item in enumerate(st.session_state.selected_songs):
title = item.get('title') or item.get('song') # Handle difference between search obj and direct obj
name = html.unescape(title)
status_text.write(f"Processing Song: {name}...")
print(f"Processing Song: {item}...")
# Direct link objects usually have the encrypted_media_url already
if 'encrypted_media_url' in item:
details = item
else:
# It's a search result, need to fetch details
details = api.get_song_details(item['id'])
if isinstance(details, list):
details = details[0]
if details and 'encrypted_media_url' in details:
print('Downloading song:',name)
fname = f"{name}.m4a".replace("/", "-")
dec_url = decrypt_url(details['encrypted_media_url'])
smart_download(dec_url, fname, st, DOWNLOAD_DIR, details)
else:
st.error(f"Failed to fetch details for {name}")
# --- PROCESS ALBUMS ---
for album in st.session_state.selected_albums:
album_name = html.unescape(album['title'])
status_text.write(f"Processing Album: {album_name}...")
album_folder = os.path.join(DOWNLOAD_DIR, album_name.replace("/", "-"))
# If we got the album via link, we might already have the 'list' (songs)
if 'albumid' in album:
details = api.get_album_details(album['albumid'])
song_list = details.get("songs", []) if 'songs' in details else []
else:
# print(f"Fetching details for album ID: {album}")
details = api.get_album_details(album['id'])
song_list = details.get("songs", []) if 'songs' in details else []
for s in song_list:
s_name = html.unescape(s['song'])
status_text.write(f"Downloading: {s_name}")
if 'encrypted_media_url' in s:
fname = f"{s_name}.m4a".replace("/", "-")
dec_url = decrypt_url(s['encrypted_media_url'])
smart_download(dec_url, fname, st, album_folder, s)
# --- PROCESS PLAYLISTS ---
for pl in st.session_state.selected_playlists:
try:
pl_name = html.unescape(pl['listname'])
except Exception as e:
pl_name = html.unescape(pl['title'])
status_text.write(f"Processing Playlist: {pl_name}...")
pl_folder = os.path.join(DOWNLOAD_DIR, pl_name.replace("/", "-"))
print('Playlist===========================/n',pl)
if 'list' in pl:
song_list = pl['list']
elif 'listid' in pl:
details = api.get_playlist_details(pl['listid'])
song_list = details.get("songs", []) if details else []
else:
details = api.get_playlist_details(pl['id'])
song_list = details.get("songs", []) if details else []
for s in song_list:
s_name = html.unescape(s['song'])
status_text.write(f"Downloading: {s_name}")
if 'encrypted_media_url' in s:
fname = f"{s_name}.m4a".replace("/", "-")
dec_url = decrypt_url(s['encrypted_media_url'])
smart_download(dec_url, fname, st, pl_folder, s)
progress_bar.progress(100)
status_text.success("All downloads complete!")
# Clear queue after download? Optional
# st.session_state.selected_songs = []
else:
st.info("Paste a link or search for items to download.")