import streamlit as st import requests import os import base64 import html import re from Crypto.Cipher import DES from Crypto.Util.Padding import unpad import base64 import zlib from mutagen.mp4 import MP4, MP4Cover from mutagen.id3 import ID3, TIT2, TPE1, TALB, TDRC, APIC from mutagen import File # from jiosaavn import JioSaavn from jiosaavn import album,playlist # --- CONFIGURATION --- DOWNLOAD_DIR = os.path.join(os.getcwd(), "downloads") if not os.path.exists(DOWNLOAD_DIR): os.makedirs(DOWNLOAD_DIR) st.set_page_config(page_title="JioSaavn Pro Downloader", page_icon="🎵", layout="wide") # --- SESSION STATE SETUP --- if 'search_results' not in st.session_state: st.session_state.search_results = None if 'link_result' not in st.session_state: st.session_state.link_result = None # Store result from direct link if 'selected_songs' not in st.session_state: st.session_state.selected_songs = [] if 'selected_albums' not in st.session_state: st.session_state.selected_albums = [] if 'selected_playlists' not in st.session_state: st.session_state.selected_playlists = [] # --- DECRYPTION ENGINE --- def decrypt_url(encrypted_url): try: if not encrypted_url: return None enc_data = base64.b64decode(encrypted_url) key = b"38346591" cipher = DES.new(key, DES.MODE_ECB) decrypted_data = cipher.decrypt(enc_data) clean_data = unpad(decrypted_data, DES.block_size) return clean_data.decode('utf-8') except Exception as e: return None # --- METADATA UPDATER --- def add_metadata(filepath, song_details): """ Adds metadata to audio file (supports both .m4a and .mp3) """ try: if not os.path.exists(filepath): return False # Extract metadata from song details title = html.unescape(song_details.get('title') or song_details.get('song') or '') artist = html.unescape(song_details.get('primary_artists') or song_details.get('singers') or '') album_name = html.unescape(song_details.get('album') or '') year = song_details.get('year') or '' image_url = song_details.get('image', '').replace('50x50', '500x500').replace('150x150', '500x500') # Download album artwork artwork_data = None if image_url: try: img_response = requests.get(image_url, timeout=5) if img_response.status_code == 200: artwork_data = img_response.content except: pass # Load the audio file audio = File(filepath) if filepath.endswith('.m4a') or filepath.endswith('.mp4'): # Handle M4A/MP4 files if audio is None: audio = MP4(filepath) audio['\xa9nam'] = title # Title audio['\xa9ART'] = artist # Artist audio['\xa9alb'] = album_name # Album if year: audio['\xa9day'] = str(year) # Year # Add artwork if artwork_data: audio['covr'] = [MP4Cover(artwork_data, imageformat=MP4Cover.FORMAT_JPEG)] audio.save() elif filepath.endswith('.mp3'): # Handle MP3 files try: audio = ID3(filepath) except: audio = ID3() audio.add(TIT2(encoding=3, text=title)) # Title audio.add(TPE1(encoding=3, text=artist)) # Artist audio.add(TALB(encoding=3, text=album_name)) # Album if year: audio.add(TDRC(encoding=3, text=str(year))) # Year # Add artwork if artwork_data: audio.add(APIC( encoding=3, mime='image/jpeg', type=3, # Cover (front) desc='Cover', data=artwork_data )) audio.save(filepath) return True except Exception as e: print(f"Error adding metadata: {e}") return False # --- SMART DOWNLOADER --- def smart_download(decrypted_url, filename, status_box, folder=DOWNLOAD_DIR, song_details=None): """Tries 320kbps -> 160kbps -> 96kbps and adds metadata""" if not decrypted_url: return False if not os.path.exists(folder): os.makedirs(folder) ext = ".mp4" if ".m4a" in decrypted_url: ext = ".m4a" base_url = decrypted_url.replace("_96" + ext, "").replace("_160" + ext, "").replace("_320" + ext, "") qualities = [("_320", "320kbps"), ("_160", "160kbps"), ("_96", "96kbps")] headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} for q_tag, label in qualities: final_url = f"{base_url}{q_tag}{ext}" try: head = requests.head(final_url, headers=headers, timeout=2) if head.status_code == 200: filepath = os.path.join(folder, filename) with requests.get(final_url, stream=True, headers=headers) as r: r.raise_for_status() with open(filepath, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) # Add metadata after successful download if song_details: add_metadata(filepath, song_details) return True except: continue return False # --- API CLIENT --- class JioSaavnAPI: def __init__(self): self.base_url = "https://www.jiosaavn.com/api.php" self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) Chrome/91.0.4472.124 Safari/537.36"} def _get(self, params): params.update({"_format": "json", "_marker": "0", "ctx": "web6dot0"}) try: res = requests.get(self.base_url, params=params, headers=self.headers) if res.status_code == 200: return res.json() except: return None return None def search_all(self, query): return self._get({"__call": "autocomplete.get", "query": query}) def get_song_details(self, id): d = self._get({"__call": "song.getDetails", "pids": id}) if 'songs' in d: d = d['songs'] if isinstance(d, list): return d[0] print('=== Song details ===\n',d) return d # NEW: Fetch song details using the Token from the URL (not the PID) def get_song_by_token(self, token): d = self._get({"__call": "webapi.get", "token": token, "type": "song"}) # webapi.get usually returns the 'songs' list directly or a dict if isinstance(d, dict) and 'songs' in d: return d['songs'][0] if isinstance(d, list) and len(d) > 0: return d[0] return d def get_album_details(self, id): return self._get({"__call": "content.getAlbumDetails", "albumid": id}) def get_album_details_link(self,id): # print(f"Decoding album id from link token: {id}") albumid = album(url=id)['albumId'] # print(f"Decoded album id: {albumid}") return self._get({"__call": "content.getAlbumDetails", "albumid": albumid}) def get_playlist_details(self, id): print(f"Fetching playlist id: {id}") return self._get({"__call": "playlist.getDetails", "listid": id}) def get_playlist_details_link(self,id): print(f"Decoding playlist id from link token: {id}") listid = playlist(url=id) print(f"Decoded playlist id: {listid}") return self._get({"__call": "playlist.getDetails", "listid": listid['id']}) api = JioSaavnAPI() # --- URL PARSER --- def process_jiosaavn_link(url): """ Identifies if the URL is a Song, Album, or Playlist and extracts the token. Returns: (type_string, token_string) or (None, None) """ try: if "jiosaavn.com/song/" in url: # Pattern: .../song/name/TOKEN token = url.split("/")[-1] return "song", token elif "jiosaavn.com/album/" in url: # Pattern: .../album/name/TOKEN token = url.split("/")[-1] return "album", token elif "jiosaavn.com/featured/" in url or "jiosaavn.com/s/playlist/" in url: # Pattern: .../featured/name/TOKEN token = url.split("/")[-1] return "playlist", token return None, None except: return None, None # --- UI LOGIC --- st.title("🎵 JioSaavn Multi-Downloader") # 1. Search / Link Input Section with st.container(): col1, col2 = st.columns([4, 1]) with col1: query_input = st.text_input("Search Query OR Paste JioSaavn Link", key="search_box", placeholder="e.g. 'Arijit Singh' or 'https://www.jiosaavn.com/song/...'") with col2: st.write("") st.write("") if st.button("🚀 Go", use_container_width=True): if query_input: # Reset previous states st.session_state.search_results = None st.session_state.link_result = None st.session_state.selected_songs = [] st.session_state.selected_albums = [] st.session_state.selected_playlists = [] # Check if it's a URL link_type, token = process_jiosaavn_link(query_input) if link_type and token: with st.spinner(f"Fetching {link_type} details..."): if link_type == "song": # For songs, we fetch details immediately and put into a list wrapper data = api.get_song_by_token(token) if data: st.session_state.link_result = {"type": "song", "data": [data]} elif link_type == "album": data = api.get_album_details_link(query_input) # print('='*80,'Album data:', data) if data: st.session_state.link_result = {"type": "album", "data": [data]} elif link_type == "playlist": data = api.get_playlist_details_link(query_input) if data: st.session_state.link_result = {"type": "playlist", "data": [data]} else: # It's a normal search text with st.spinner("Searching..."): st.session_state.search_results = api.search_all(query_input) # 2. Display Results # A. LINK RESULT (Direct Link) if st.session_state.link_result: res = st.session_state.link_result st.markdown(f"### 🔗 Link Detected: {res['type'].capitalize()}") data_list = res['data'] for item in data_list: with st.container(): c1, c2, c3 = st.columns([1, 5, 1]) with c1: img = item.get('image', '').replace('50x50', '150x150') if img: st.image(img, width=80) with c2: title = html.unescape(item.get('title') or item.get('song') or item.get('listname') or "Unknown") st.markdown(f"**{title}**") desc = html.unescape(item.get('description') or item.get('subtitle') or "") st.caption(desc) with c3: # Auto-select logic for links # We use a button that acts as a toggle or just pre-add them if res['type'] == 'song': if item not in st.session_state.selected_songs: st.session_state.selected_songs.append(item) st.success("Ready") elif res['type'] == 'album': if item not in st.session_state.selected_albums: st.session_state.selected_albums.append(item) st.success("Ready") elif res['type'] == 'playlist': if item not in st.session_state.selected_playlists: st.session_state.selected_playlists.append(item) st.success("Ready") st.divider() # B. SEARCH RESULTS (Normal Search) elif st.session_state.search_results: res = st.session_state.search_results tab1, tab2, tab3 = st.tabs(["🎵 Songs", "💿 Albums", "Cc Playlists"]) # --- SONGS TAB --- with tab1: songs = res.get("songs", {}).get("data", []) if not songs: st.info("No songs found.") for song in songs: with st.container(): c1, c2, c3 = st.columns([1, 5, 1]) with c1: img = song.get('image', '').replace('50x50', '150x150') if img: st.image(img, width=60) with c2: st.markdown(f"**{html.unescape(song['title'])}**") st.caption(html.unescape(song['description'])) with c3: if st.checkbox("Select", key=f"s_{song['id']}"): if song not in st.session_state.selected_songs: st.session_state.selected_songs.append(song) else: if song in st.session_state.selected_songs: st.session_state.selected_songs.remove(song) st.divider() # --- ALBUMS TAB --- with tab2: albums = res.get("albums", {}).get("data", []) # print('=== Albums ===\n', albums) if not albums: st.info("No albums found.") for album in albums: with st.container(): c1, c2, c3 = st.columns([1, 5, 1]) with c1: img = album.get('image', '').replace('50x50', '150x150') if img: st.image(img, width=60) with c2: st.markdown(f"**{html.unescape(album['title'])}**") st.caption(html.unescape(album['description'])) with c3: if st.checkbox("Select", key=f"a_{album['id']}"): if album not in st.session_state.selected_albums: st.session_state.selected_albums.append(album) else: if album in st.session_state.selected_albums: st.session_state.selected_albums.remove(album) st.divider() # --- PLAYLISTS TAB --- with tab3: playlists = res.get("playlists", {}).get("data", []) if not playlists: st.info("No playlists found.") for pl in playlists: with st.container(): c1, c2, c3 = st.columns([1, 5, 1]) with c1: img = pl.get('image', '').replace('50x50', '150x150') if img: st.image(img, width=60) with c2: st.markdown(f"**{html.unescape(pl['title'])}**") st.caption(html.unescape(pl['description'])) with c3: if st.checkbox("Select", key=f"p_{pl['id']}"): if pl not in st.session_state.selected_playlists: st.session_state.selected_playlists.append(pl) else: if pl in st.session_state.selected_playlists: st.session_state.selected_playlists.remove(pl) st.divider() # 3. Download Action Section st.markdown("### 📥 Download Queue") total_items = len(st.session_state.selected_songs) + len(st.session_state.selected_albums) + len(st.session_state.selected_playlists) if total_items > 0: st.success(f"Selected: {len(st.session_state.selected_songs)} Songs, {len(st.session_state.selected_albums)} Albums, {len(st.session_state.selected_playlists)} Playlists") if st.button(f"Start Download ({total_items})", type="primary"): status_text = st.empty() progress_bar = st.progress(0) # --- PROCESS INDIVIDUAL SONGS --- for i, item in enumerate(st.session_state.selected_songs): title = item.get('title') or item.get('song') # Handle difference between search obj and direct obj name = html.unescape(title) status_text.write(f"Processing Song: {name}...") print(f"Processing Song: {item}...") # Direct link objects usually have the encrypted_media_url already if 'encrypted_media_url' in item: details = item else: # It's a search result, need to fetch details details = api.get_song_details(item['id']) if isinstance(details, list): details = details[0] if details and 'encrypted_media_url' in details: print('Downloading song:',name) fname = f"{name}.m4a".replace("/", "-") dec_url = decrypt_url(details['encrypted_media_url']) smart_download(dec_url, fname, st, DOWNLOAD_DIR, details) else: st.error(f"Failed to fetch details for {name}") # --- PROCESS ALBUMS --- for album in st.session_state.selected_albums: album_name = html.unescape(album['title']) status_text.write(f"Processing Album: {album_name}...") album_folder = os.path.join(DOWNLOAD_DIR, album_name.replace("/", "-")) # If we got the album via link, we might already have the 'list' (songs) if 'albumid' in album: details = api.get_album_details(album['albumid']) song_list = details.get("songs", []) if 'songs' in details else [] else: # print(f"Fetching details for album ID: {album}") details = api.get_album_details(album['id']) song_list = details.get("songs", []) if 'songs' in details else [] for s in song_list: s_name = html.unescape(s['song']) status_text.write(f"Downloading: {s_name}") if 'encrypted_media_url' in s: fname = f"{s_name}.m4a".replace("/", "-") dec_url = decrypt_url(s['encrypted_media_url']) smart_download(dec_url, fname, st, album_folder, s) # --- PROCESS PLAYLISTS --- for pl in st.session_state.selected_playlists: try: pl_name = html.unescape(pl['listname']) except Exception as e: pl_name = html.unescape(pl['title']) status_text.write(f"Processing Playlist: {pl_name}...") pl_folder = os.path.join(DOWNLOAD_DIR, pl_name.replace("/", "-")) print('Playlist===========================/n',pl) if 'list' in pl: song_list = pl['list'] elif 'listid' in pl: details = api.get_playlist_details(pl['listid']) song_list = details.get("songs", []) if details else [] else: details = api.get_playlist_details(pl['id']) song_list = details.get("songs", []) if details else [] for s in song_list: s_name = html.unescape(s['song']) status_text.write(f"Downloading: {s_name}") if 'encrypted_media_url' in s: fname = f"{s_name}.m4a".replace("/", "-") dec_url = decrypt_url(s['encrypted_media_url']) smart_download(dec_url, fname, st, pl_folder, s) progress_bar.progress(100) status_text.success("All downloads complete!") # Clear queue after download? Optional # st.session_state.selected_songs = [] else: st.info("Paste a link or search for items to download.")