Commit 88ac84ba authored by Bernhard Geier's avatar Bernhard Geier
Browse files

rewrite in Python3

- Store more metadata in ID3 tags
- No more accidental downloads of "Zündfunk Generator" (has it's own official podcast)
parent 8c9025aa
# Zündfunk download
# Zündfunk Download
Bayerischer Rundfunk airs a pretty decent radio show called "Zündunk", featuring new music, politics and culture.
For people who missed a show, Bayerischer Rundfunk provides recordings on its web page.
The Bayerischer Rundfunk airs a pretty decent radio show called "Zündunk", featuring new music, politics and culture. For people who missed a show the Bayerischer Rundfunk provides recordings on its web page.
But only for less than one week. And only within a player, without a convenient download button.
That's why I wrote this script.
That's why I wrote this Python 3 script.
This Python 3 script is a simple command line tool to downloads all currently available Zündfunk episodes from Bayerischer Rundfunk's web page as MP3 files.
### Requirements
Python 3 with modules "mutagen", "urllib3" and "requests".
(On Debian/Ubuntu: `sudo apt install python3 python3-mutagen python3-urllib3 python3-requests`)
### Usage
```./ <TargetDirectory>```
The script searches Bayerischer Rundfunk's "Zündfunk" web site for recordings and downloads all currently available episodes into the given target directory.
Files aready present get skipped, so it is well suited for cron jobs.
The show's metadata gets stored in the downloaded MP3 file's ID3 tags (see below).
```./ Downloads/Zündfunk```
This script simply downloads all currently available Zündfunk recordings from the Bayerischer Rundfunk's web page and saves them in a directory.
The downloaded files get named with the show's date and title (e.g. "Zündfunk 2017-10-19 - Wahlen in Tschechien _ Das Mode-Comeback der Logos _ Band Interview Fink.mp3"), recordings already downloaded in a previous run get skipped.
The script also adds some ID3v2 tags to the MP3 files (artist: "Zündfunk", title: the show's title, comment: the show's description if available).
This would download all available Zündfunk episodes and save them with correct ID3 tags in the "Downloads/Zündfunk" directory.
To create a personal archive of all Zündfunk shows just run this script once a day, e.g. with a cronjob.
use strict;
use warnings;
use WWW::Mechanize;
use HTML::TreeBuilder;
use XML::LibXML;
use JSON;
use MP3::Tag;
MP3::Tag->config(write_v24 => 1);
use File::Spec;
use utf8;
die ($0." <directory>\n") unless ($DESTDIR);
die ($DESTDIR." does not exist") unless ( -d $DESTDIR);
my $url="";
my $browser=WWW::Mechanize->new();
$browser->get($url) or die($!);
# Auf der Seite $url kann man sich durch die letzten und kommenden Zündfunk-Sendungen klicken, die Daten dazu kommen aus einer JSON-Datei
my $tree=HTML::TreeBuilder->new_from_content($browser->content());
my $programDiv=$tree->look_down('_tag'=>'div','id'=>'program_stage');
my ($programJSON)=$programDiv->attr('class')=~/jsonUrl:\'(.+)\'/;
# JSON nach jeder verfügbaren Sendung durchgehen
my $decodedProgramJSON=JSON::decode_json($browser->content);
foreach (@{$decodedProgramJSON->{'channelBroadcasts'}}) {
next unless (($_->{'broadcastStartDate'}) and ($_->{'broadcastEndDate'})); # Sendung ist noch in der Zukunft
my ($url)=$_->{'broadcastHtml'}=~/<a href=\"(.+?)\" title=\"/ or next; # Seite einer Sendung
$browser->get($url) or die($!); # sendungsseite aufrufen
# auf der Sendungsseite ist entweder direkt ein Player, oder auf einer oder mehrerer Unterseite, oder es gibt gar keinen
my @possibleAudioPagesUrls=($url,map{ $_->url() } $browser->find_all_links('class_regex'=>qr/link_audio/));
my @xmlUrls;
foreach my $audioUrl (@possibleAudioPagesUrls) {
my ($xmlUrl)=$browser->content()=~/dataURL:\'(\/.+xml)\'/;
if ($xmlUrl) {
next unless (@xmlUrls);
# alle gefunden XML-Dateien der Sendung auf Audio-Links prüfen.
my %longestAudio=('durationInSeconds'=>0);
foreach my $xmlUrl (@xmlUrls) {
my $dom=XML::LibXML->load_xml(string=>$browser->content);
# Duration ist HH:MM:SS, bei kürzeren Sachen auch mal MM:SS
my $duration=$dom->findvalue('playlist/audio/duration');
my ($h,$m,$s)=split(/:/,$duration);
my $durationInSeconds=$s+($m*60)+($h*60*60);
# Duration muss mindestens 45 Minuten (=2700 Sekunden) sein, die längste Audiodatei wird gesucht
if (($durationInSeconds >= 2700) and ($durationInSeconds > $longestAudio{'durationInSeconds'})) {
$longestAudio{'durationInSeconds'} = $durationInSeconds;
$longestAudio{'description'} = $dom->findvalue('playlist/audio/desc');
$longestAudio{'broadcastDate'} = $dom->findvalue('playlist/audio/broadcastDate');
$longestAudio{'title'} = $dom->findvalue('playlist/audio/title');
foreach ($dom->findnodes('playlist/audio/assets/asset/downloadUrl')) {
if ($_->to_literal=~/mp3$/) {
next unless ($longestAudio{'downloadUrl'});
# wenn was gefunden wurde: herunterladen
my $filename="Zündfunk ".join("-",reverse(split(/\./,$longestAudio{'broadcastDate'})))." - ".substr($longestAudio{'title'},0,80).".mp3";
my $file=File::Spec->join($DESTDIR,$filename);
if (-f $file) {
print "File ".$filename." does already exist, skipping\n";
print "Downloading ".$filename."... ";
my ($tries,@parameters,$FD);
$longestAudio{'downloadUrl'}, # URL
":content_cb" => sub {
my ($chunk) = @_;
print $FD $chunk;
while ($tries) {
my $bytes=-s $file.".part";
if ($bytes > 0) {
my $result=$browser->get(@parameters);
close $FD;
last if ($result->is_success or $result->code == 416);
if ($tries eq 0) {
print "failed.\n";
rename $file.".part",$file;
my $mp3file=MP3::Tag->new($file);
my $id3v2=($mp3file->{ID3v2} or $mp3file->new_tag("ID3v2"));
print "done.\n";
#!/usr/bin/env python3
import requests
import sys
import urllib.parse
import urllib.request
import os.path
import re
from datetime import datetime, date
import shutil
from tempfile import NamedTemporaryFile
import lxml
from bs4 import BeautifulSoup
import pprint
def download(url: str, attempts=4):
tmpfile = NamedTemporaryFile(delete=False)
for attempt in range (1,attempts+1):
if attempt > 1:
stream = urllib.request.urlopen(url)
shutil.copyfileobj(stream, tmpfile)
return None
if len(sys.argv) != 2:
print("Usage:", file=sys.stderr)
print("%s <DownloadDir>\n" % sys.argv[0], file=sys.stderr)
print("Example:", file=sys.stderr)
print("%s 'Downloads/Zündfunk Recordings'\n" % sys.argv[0], file=sys.stderr)
DESTDIR = sys.argv[1]
if not os.path.isdir(DESTDIR):
print("Directory %s does not exist!" % DESTDIR, file=sys.stderr)
def time2seconds(timestr: str):
# return duration of HH:MM:SS in seconds
parts = re.split(":", timestr)
return int(parts[0])*3600+int(parts[1])*60+int(parts[2])
def safe_text_get(l: list, idx: int, default=None):
# return text attribute of list item, or default value if it does not exist
return l[idx].text
except IndexError:
return default
html = requests.get(baseUrl, timeout=5).text
soup = BeautifulSoup(html, 'lxml')
# extract Json URL
jsonUrl = None
for className in soup.find('div', id='program_stage')['class']:
match = re.match('.*jsonUrl:\s*[\'\"](.+?)[\'\"]',className)
if match:
jsonUrl =
# jsonUrl is relative, make it absolute
jsonUrl = urllib.parse.urljoin(baseUrl, jsonUrl)
if jsonUrl == None:
print ("ERROR: Could not find JSON file containing the broadcasts", file=sys.stderr)
# fetch Json
broadcastJson = requests.get(jsonUrl, timeout=5).json()
# a "channelBroadcast" is a episode of a radio show
for bc in broadcastJson['channelBroadcasts']:
if not bc['broadcastStartDate'] or not bc['broadcastEndDate']:
# show's in the future, skip it
# the link to the episode's web page is in the "broadcastHTML" attribute - within HTML
bcSoup = BeautifulSoup(bc['broadcastHtml'], 'lxml')
episodeUrl = bcSoup.find('div', class_='broadcast').find('a', href=True)['href']
episodeUrl = urllib.parse.urljoin(baseUrl, episodeUrl)
episodePage = requests.get(episodeUrl, timeout=5).text
episodePageSoup = BeautifulSoup(episodePage, 'lxml')
# the episode's web page either contains a player, links to websites with a player, or nothing of interest.
# we collect a list of URLs of all those sites
candidates = [ episodeUrl ]
for url in list(link['href'] for link in episodePageSoup.find_all('a',class_=re.compile('link_audio'), href=True)):
candidates.append(urllib.parse.urljoin(baseUrl, url))
# on each of this pages try to find the player link (<a id="avPlayer_...) and extract the dataURL from the "onlick" parameter
# dataURL points to a XML ressource. Fetch them!
xmls = []
for url in candidates:
page = requests.get(url, timeout=5).text
pageSoup = BeautifulSoup(page, 'lxml')
for player in pageSoup.find_all('a', id=re.compile('^avPlayer'), onclick=True):
match = re.match('^.*dataURL:\s*[\'\"](.+?)[\'\"]',player['onclick'])
if match:
dataUrl =
dataUrl = urllib.parse.urljoin(baseUrl, dataUrl)
# the dataURL contains the URL to a XML file with metadata for the media
# if nothing was found: continue with next episode
if len(xmls) == 0:
# Figure out best matching XML
## sort XMLs according to audio length, longest first
xmls = sorted(xmls, key=lambda x: time2seconds(x.xpath('./audio/duration')[0].text), reverse=True)
# extract metadata from XML with longest audio
XMLmeta = {
'topline': safe_text_get(xmls[0].xpath("./audio/topline"),0),
'title': safe_text_get(xmls[0].xpath("./audio/title"),0),
'shareTitle': safe_text_get(xmls[0].xpath("./audio/shareTitle"),0),
'duration': safe_text_get(xmls[0].xpath("./audio/duration"),0),
'channel': safe_text_get(xmls[0].xpath("./audio/channel"),0,"BAYERN 2"),
'broadcast': safe_text_get(xmls[0].xpath("./audio/broadcast"),0),
'broadcastDate': safe_text_get(xmls[0].xpath("./audio/broadcastDate"),0,"%d.%m.%Y")),
'author': safe_text_get(xmls[0].xpath("./audio/author"),0),
'desc': safe_text_get(xmls[0].xpath("./audio/desc"),0),
'permalink': safe_text_get(xmls[0].xpath("./audio/permalink"),0),
'homepageUrl': safe_text_get(xmls[0].xpath("./audio/homepageUrl"),0,""),
'imageUrl': "" + safe_text_get(xmls[0].xpath("./audio/teaserImage/variants/variant[@name='image512']/url"),0),
'agf_c9': safe_text_get(xmls[0].xpath("./audio/agf-tracking/c9"),0),
# pprint.PrettyPrinter(indent=4).pprint(XMLmeta)
# continue
# our own metadata
meta = {
'downloadUrl': None,
'broadcastDate_dt': None,
'filename': None,
'filepath': None,
'duration_ms': time2seconds(XMLmeta['duration']) * 1000,
## Filter out some episodes
# I know that a real Zündfunk episode is longer than 45 minutes. Skip this episode if it is shorter
if meta['duration_ms'] < 45 * 60 * 1000:
# Skip all non "Zündfunk" broadcasts like "Zündfunk Generator" (which has an official podcast feed)
if XMLmeta['broadcast'].lower() != 'zündfunk':
# build filename
filename = XMLmeta['broadcast'] + " " + '-'.join(reversed(XMLmeta['broadcastDate'].split('.'))) + " - " + XMLmeta['title'][0:80] + ".mp3"
# in filename replace bad characters
meta['filename'] = re.sub('[^\w\s\-\.\[\]]','_', filename)
# filename with path
meta['filepath'] = os.path.join(DESTDIR, meta['filename'])
# continue with next episode if file already exists
if os.path.isfile(meta['filepath']) and os.path.getsize(meta['filepath'])>0:
print("%s already exists, skipping." % meta['filename'], flush=True)
## Populate values in "meta" dict
# agf_c9 looks like "Zündfunk_Zündfunk_27.08.2020_19:05" or "Zündfunk_Zündfunk Generator_30.08.2020_22:05"
# so it can be used to extract the episode's exact broadcast time
parts = XMLmeta['agf_c9'].split('_')
meta['broadcastDate_dt'] = datetime.strptime(parts[2] + " " + parts[3], "%d.%m.%Y %H:%M")
meta['broadcastDate_dt'] = datetime.strptime(XMLmeta['broadcastDate'], "%d.%m.%Y")
# from the XML with the longest audio, get all MP3 audio tracks ("assets")
mp3Assets = xmls[0].xpath("./audio/assets/asset/codecAudio[contains(.,'mp3') or contains(.,'MP3')]/..")
# from all MP3 audio tracks select the one with the highest bitrate...
highestBitrateMp3Asset = sorted(mp3Assets, key=lambda x: int(x.xpath('./bitrateAudio')[0].text), reverse=True)[0]
# ...and get its downloadURL
meta['downloadUrl'] = "https:" + highestBitrateMp3Asset.xpath("./downloadUrl")[0].text
# download file in temporary dir
print("Downloading %s..." % meta['filename'], end=" ", flush=True)
tmpFile = download(meta['downloadUrl'])
if tmpFile is None:
print ("failed.", flush=True)
print ("ERROR: Could not download %s" % url, file=sys.stderr)
# set ID3 tag
tag = ID3(tmpFile)
except ID3NoHeaderError:
tag = ID3()
#tag.add(TIT2(text=[meta['broadcastDate_dt'].strftime("%Y-%m-%d") + ": "+XMLmeta['title']]))
tag.add(COMM(lang="deu", desc="desc", text=[XMLmeta['desc']]))
# add cover image
if XMLmeta['imageUrl'] is not None:
response = requests.get(XMLmeta['imageUrl'], timeout=5)
if response.status_code == 200:
imageData = response.content
imageMime = response.headers['content-type']
if imageData is not None and imageMime is not None:
tag.add(APIC(mime=imageMime, desc="Front Cover", data=imageData))
# save ID3 tag,v2_version=3)
# done
shutil.move(tmpFile, meta['filepath'])
print("done.", flush=True)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment