Add the beginning of the programmation feature.

This commit is contained in:
Totonyus 2022-12-11 22:50:56 +01:00
parent 0516d407aa
commit 0aea70129b
3 changed files with 735 additions and 52 deletions

View File

@ -16,3 +16,21 @@ app_defaults = {
'_redis_host': 'ydl_api_ng',
'_redis_port': 6379
}
programmation_object_default = {
'id': None,
'url': None,
'user_token': None,
'enabled': True,
'planning': {
'recording_start_date': None,
'recording_duration': None,
'recording_stops_at_end': False,
'recurrence_cron': None,
'recurrence_start_date': None,
'recurrence_end_date': None,
},
'presets': None
}

226
programmation_manager.py Normal file
View File

@ -0,0 +1,226 @@
import copy
import json
import logging
import logging.handlers as handlers
import os
from redis import Redis
import defaults
import uuid
from tinydb import TinyDB, Query
from cronsim import CronSim
from datetime import datetime, timedelta
class ProgrammationManager:
def __init__(self, database_file=None, *args, **kwargs):
self.__database_file = database_file if database_file is not None else 'database.json'
self.__db = TinyDB(self.__database_file)
self.__scheduled_jobs_table = self.__db.table('scheduled_jobs')
@staticmethod
def generate_identifier(*args, **kwargs):
return f'{uuid.uuid4()}'
# Add values that wasn't provided in request
def generate_programmation(self, source_programmation=defaults.programmation_object_default, programmation=None,
*args, **kwargs):
full_programmation = copy.deepcopy(source_programmation)
for entry, value in programmation.items():
if entry == 'planning':
for planning_entry, planning_value in value.items():
full_programmation[entry][planning_entry] = planning_value
else:
full_programmation[entry] = value
full_programmation['id'] = self.generate_identifier()
return full_programmation
def add_programmation(self, programmation=None, *args, **kwargs):
generated_programmation = self.generate_programmation(programmation=programmation)
programmation_validation_result = self.validate_programmation(generated_programmation)
if len(programmation_validation_result) != 0:
return programmation_validation_result, None
else:
self.__scheduled_jobs_table.insert(generated_programmation)
return programmation_validation_result, generated_programmation
def validate_programmation(self, programmation=None, *args, **kwargs):
errors_list = []
date_controls = ['recurrence_start_date', 'recurrence_end_date', 'recording_start_date']
for field in date_controls:
if programmation.get('planning').get(field) is not None:
try:
datetime.fromisoformat(programmation.get('planning').get(field))
except Exception as error:
errors_list.append({'field': field, 'error': error})
if programmation.get('url') is None or programmation.get('url') == '':
errors_list.append({'field': 'url', 'error': 'url is empty'})
if type(programmation.get('planning').get('recording_stops_at_end')) != bool:
errors_list.append({'field': 'recording_stops_at_end', 'error': 'must be a boolean'})
if type(programmation.get('enabled')) != bool:
errors_list.append({'field': 'enabled', 'error': 'must be a boolean'})
if programmation.get('presets') is not None and type(programmation.get('presets')) != list:
errors_list.append({'field': 'presets', 'error': 'must be None or list'})
if programmation.get('planning').get('recording_duration') is not None and type(
programmation.get('planning').get('recording_duration')) != int:
errors_list.append({'field': 'recording_duration', 'error': 'must be None or int'})
if programmation.get('planning').get('recurrence_cron') is not None and programmation.get('planning').get(
'recording_start_date') is not None:
errors_list.append({'field': 'recurrence_cron', 'error': 'cannont be use with recording_start_date field'})
errors_list.append({'field': 'recording_start_date', 'error': 'cannont be use with recurrence_cron field'})
try:
if programmation.get('planning').get('recurrence_cron') is not None:
cron = CronSim(programmation.get('planning').get('recurrence_cron'), datetime.now())
if programmation.get('planning').get('recurrence_start_date') is None:
programmation.get('planning')['recurrence_start_date'] = datetime.isoformat(next(cron), sep=' ')
except Exception as e:
errors_list.append({'field': 'recurrence_cron', 'error': e})
return errors_list
def get_all_programmations(self, *args, **kwargs):
return self.__scheduled_jobs_table.all()
def get_all_enabled_programmations(self, *args, **kwargs):
return self.__scheduled_jobs_table.search(Query().enabled == True)
def get_programmation_by_id(self, id=None, *args, **kwargs):
result = self.__scheduled_jobs_table.search(Query().id == id)
return result[0] if len(result) != 0 else None
def delete_programmation_by_id(self, id=None, *args, **kwargs):
# TODO delete in redis queue
return self.__scheduled_jobs_table.remove(Query().id == id)
def get_all_from_date(self, from_date=datetime.now(), *args, **kwargs):
all_programmations = self.get_all_programmations()
programmations = []
for programmation in all_programmations:
end_date = self.get_end_date(programmation)
if end_date is not None and from_date > end_date:
programmations.append(programmation)
return programmations
def get_end_date(self, programmation=None, *args, **kwargs):
planning = programmation.get('planning')
recording_start_date = planning.get('recording_start_date')
recording_duration = planning.get('recording_duration')
recurrence_cron = planning.get('recurrence_cron')
recurrence_start_date = planning.get('recurrence_start_date')
recurrence_end_date = planning.get('recurrence_end_date')
end_date = None
if planning is None:
return None
if recording_start_date is not None and recording_duration is not None:
end_date = datetime.fromisoformat(recording_start_date) + timedelta(minutes=recording_duration)
if recurrence_end_date is not None:
end_date = datetime.fromisoformat(recurrence_end_date)
if recurrence_end_date is not None and recording_duration is not None:
end_date = datetime.fromisoformat(recurrence_end_date) + timedelta(minutes=recording_duration)
if recurrence_end_date is not None and recurrence_cron is not None:
end_date = datetime.fromisoformat(recurrence_start_date)
cron = CronSim(recurrence_cron, datetime.fromisoformat(recurrence_start_date))
next_iteration = next(cron)
while next_iteration < datetime.fromisoformat(recurrence_end_date):
end_date = next_iteration
next_iteration = next(cron)
if recording_duration is not None:
end_date = end_date + timedelta(minutes=recording_duration)
return end_date
def update_programmation_by_id(self, id=None, programmation=None, *args, **kwargs):
stored_programmation = self.get_programmation_by_id(id)
if stored_programmation is None:
return None
changed_programmation = self.generate_programmation(source_programmation=stored_programmation,
programmation=programmation)
validation_result = self.validate_programmation(programmation=changed_programmation)
changed_programmation['id'] = stored_programmation.get('id')
if len(validation_result) != 0:
return validation_result, None
else:
self.__scheduled_jobs_table.update(changed_programmation, Query().id == stored_programmation.get('id'))
return validation_result, changed_programmation
def purge_all_past_programmations(self, from_date=datetime.now(), *args, **kwargs):
programmations = self.get_all_from_date(from_date=from_date)
for programmation in programmations:
self.delete_programmation_by_id(programmation.get('id'))
return programmations
def get_next_execution(self, programmation=None, from_date=datetime.now(), *args, **kwargs):
planning = programmation.get('planning')
recording_start_date = planning.get('recording_start_date')
recurrence_cron = planning.get('recurrence_cron')
recurrence_start_date = planning.get('recurrence_start_date')
next_iteration = None
if recording_start_date is not None:
next_iteration = datetime.fromisoformat(recording_start_date)
if recurrence_cron is not None:
cron = CronSim(recurrence_cron, from_date - timedelta(minutes=1))
next_iteration = next(cron)
while next_iteration < datetime.fromisoformat(recurrence_start_date):
next_iteration = next(cron)
# If None, must launch it immediately
return next_iteration
def must_be_restarted(self, programmation=None, from_date=datetime.now(), *args, **kwargs):
planning = programmation.get('planning')
recording_start_date = planning.get('recording_start_date')
recording_duration = planning.get('recording_duration')
recurrence_cron = planning.get('recurrence_cron')
recurrence_start_date = planning.get('recurrence_start_date')
if recording_start_date is not None:
start_date = datetime.fromisoformat(recording_start_date)
elif recurrence_start_date is not None:
start_date = next(CronSim(recurrence_cron, from_date - timedelta(minutes=recording_duration)))
else:
return False
if start_date < from_date < start_date + timedelta(minutes=recording_duration):
return True
else:
return False

View File

@ -1,4 +1,5 @@
import copy
import os
import unittest
import requests
@ -6,6 +7,7 @@ import requests
import config_manager
import download_manager
from datetime import datetime, timedelta
class TestActualParametersFile(unittest.TestCase):
def test_app(self):
@ -53,9 +55,12 @@ class TestConfig(unittest.TestCase):
self.assertEqual('bestaudio', self.config_manager.get_preset_params('audio').get('format'))
def test_expand(self):
self.assertEqual('videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s', self.config_manager.get_preset_params('DEFAULT').get('outtmpl').get('default'))
self.assertEqual('videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s', self.config_manager.get_preset_params('SHARE').get('outtmpl').get('default'))
self.assertNotEqual('/home/videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s', self.config_manager.get_preset_params('DEFAULT').get('outtmpl').get('default'))
self.assertEqual('videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s',
self.config_manager.get_preset_params('DEFAULT').get('outtmpl').get('default'))
self.assertEqual('videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s',
self.config_manager.get_preset_params('SHARE').get('outtmpl').get('default'))
self.assertNotEqual('/home/videos/%(webpage_url_domain)s/%(title)s_(%(height)s).%(ext)s',
self.config_manager.get_preset_params('DEFAULT').get('outtmpl').get('default'))
self.assertFalse(self.config_manager.get_preset_params('playlist').get('noplaylist'))
self.assertTrue(self.config_manager.get_preset_params('DEFAULT').get('noplaylist'))
@ -68,18 +73,21 @@ class TestConfig(unittest.TestCase):
self.assertIsNotNone(self.config_manager.get_user_param_by_token('dad_super_password'))
self.assertIsNone(self.config_manager.get_user_param_by_token('doggo_super_password'))
self.assertIsNone(self.config_manager.get_user_param_by_token('dad_super_password').get('doesnotexists'))
self.assertEqual('./downloads/symlink_to_dad_home/', self.config_manager.get_user_param_by_token('dad_super_password').get('paths').get('home'))
self.assertEqual('./downloads/symlink_to_dad_home/',
self.config_manager.get_user_param_by_token('dad_super_password').get('paths').get('home'))
def test_auth(self):
self.assertEqual('Totonyus', self.config_manager.get_auth_params('DAILYMOTION').get('username'))
self.assertEqual('is_this_site_still_running_fr_?', self.config_manager.get_auth_params('DAILYMOTION').get('password'))
self.assertEqual('is_this_site_still_running_fr_?',
self.config_manager.get_auth_params('DAILYMOTION').get('password'))
class TestUtils(unittest.TestCase):
config_manager = config_manager.ConfigManager('params/params.sample.ini')
def test_get_presets(self):
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=OWCK3413Wuk', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=OWCK3413Wuk', None,
None)
dm.flush_presets()
self.assertIsNotNone(dm.get_presets_objects(['AUDIO']))
@ -135,119 +143,160 @@ class TestUtils(unittest.TestCase):
self.assertFalse(dm.no_preset_found)
def test_is_from_playlist(self):
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj',
None, None)
self.assertTrue(dm.check_if_from_playlist())
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2',
None, None)
self.assertTrue(dm.check_if_from_playlist())
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertFalse(dm.check_if_from_playlist())
dm = download_manager.DownloadManager(self.config_manager, 'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
self.assertIsNone(dm.check_if_from_playlist())
def test_is_video(self):
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj',
None, None)
self.assertFalse(dm.check_if_video())
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2',
None, None)
self.assertTrue(dm.check_if_video())
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertTrue(dm.check_if_video())
dm = download_manager.DownloadManager(self.config_manager, 'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
self.assertIsNone(dm.check_if_video())
def test_can_be_checked(self):
default_preset = self.config_manager.get_preset_params('DEFAULT')
playlist_preset = self.config_manager.get_preset_params('PLAYLIST')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj',
None, None)
self.assertFalse(dm.can_url_be_checked(default_preset))
self.assertTrue(dm.presets[0].get('ignoreerrors'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/watch?v=5UErWGjj95Y&list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&index=2',
None, None)
self.assertTrue(dm.can_url_be_checked(default_preset))
self.assertFalse(dm.presets[0].get('ignoreerrors'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertTrue(dm.can_url_be_checked(default_preset))
self.assertFalse(dm.presets[0].get('ignoreerrors'))
dm = download_manager.DownloadManager(self.config_manager, 'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://twitter.com/Totonyus/status/1471896525617909760', None, None)
self.assertTrue(dm.can_url_be_checked(default_preset))
self.assertTrue(dm.presets[0].get('ignoreerrors'))
def test_get_permission(self):
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'dad_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'dad_super_password')
self.assertIsInstance(dm.is_user_permitted(True), config_manager.SectionConfig)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'doggo_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'doggo_super_password')
self.assertFalse(dm.is_user_permitted(True))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertFalse(dm.is_user_permitted(True))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'dad_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'dad_super_password')
self.assertIsNone(dm.is_user_permitted(False))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'doggo_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'doggo_super_password')
self.assertIsNone(dm.is_user_permitted(False))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertIsNone(dm.is_user_permitted(False))
def test_get_preset_for_user(self):
default_preset = self.config_manager.get_preset_params('DEFAULT')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'totonyus_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'totonyus_super_password')
self.assertEqual(['fr', 'en'], dm.get_preset_for_user(copy.deepcopy(default_preset)).get('subtitleslangs'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'totonyus_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'totonyus_super_password')
self.assertEqual('./downloads/', dm.get_preset_for_user(copy.deepcopy(default_preset)).get('paths').get('home'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'totonyus_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'totonyus_super_password')
self.assertEqual('DEFAULT', dm.get_preset_for_user(copy.deepcopy(default_preset)).get('_name'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'dad_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'dad_super_password')
self.assertEqual(['en'], dm.get_preset_for_user(copy.deepcopy(default_preset)).get('subtitleslangs'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, 'dad_super_password')
self.assertEqual('./downloads/symlink_to_dad_home/', dm.get_preset_for_user(copy.deepcopy(default_preset)).get('paths').get('home'))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
'dad_super_password')
self.assertEqual('./downloads/symlink_to_dad_home/',
dm.get_preset_for_user(copy.deepcopy(default_preset)).get('paths').get('home'))
def test_simulate(self):
default_preset = self.config_manager.get_preset_params('DEFAULT')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', None,
None)
self.assertTrue(dm.simulate_download(default_preset))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UfErWGjj95', None, None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UfErWGjj95', None,
None)
self.assertFalse(dm.simulate_download(default_preset))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj', None, None)
dm = download_manager.DownloadManager(self.config_manager,
'https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj',
None, None)
self.assertIsNone(dm.simulate_download(default_preset))
@unittest.skip
def test_process_download(self):
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', ['DEFAULT'], None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y',
['DEFAULT'], None)
self.assertTrue(dm.process_download(dm.presets[0]))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', ['AUDIO'], None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y',
['AUDIO'], None)
self.assertTrue(dm.process_download(dm.presets[0]))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5dUErWGjj95', ['AUDIO'], None)
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5dUErWGjj95',
['AUDIO'], None)
self.assertFalse(dm.process_download(dm.presets[0]))
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y', ['AUDIO'], 'dad_super_password')
dm = download_manager.DownloadManager(self.config_manager, 'https://www.youtube.com/watch?v=5UErWGjj95Y',
['AUDIO'], 'dad_super_password')
self.assertTrue(dm.process_download(dm.presets[0]))
def test_sanitize(self):
self.config_manager.get_user_param_by_token('dad_super_password'),
self.assertEqual('dad_super_password', self.config_manager.get_user_param_by_token('dad_super_password').get('_token'))
self.assertEqual('dad_super_password',
self.config_manager.get_user_param_by_token('dad_super_password').get('_token'))
self.assertIsNone(self.config_manager.sanitize_config_object(self.config_manager.get_all_users_params()).get('DAD').get('_token'))
self.assertIsNone(
self.config_manager.sanitize_config_object(self.config_manager.get_all_users_params()).get('DAD').get(
'_token'))
@unittest.skip
@ -255,20 +304,30 @@ class TestAPI(unittest.TestCase):
base_url = "http://localhost:5011/download"
def test_download(self):
self.assertEqual(401, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk').status_code)
self.assertEqual(202, requests.get(f'{self.base_url}?url=https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&token=dad_super_password').status_code)
self.assertEqual(200, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password').status_code)
self.assertEqual(401, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=mom_super_password').status_code)
self.assertEqual(401,
requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk').status_code)
self.assertEqual(202, requests.get(
f'{self.base_url}?url=https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&token=dad_super_password').status_code)
self.assertEqual(200, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password').status_code)
self.assertEqual(401, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=mom_super_password').status_code)
self.assertEqual(400, requests.get(f'{self.base_url}?url=&token=dad_super_password').status_code)
self.assertEqual(422, requests.get(f'{self.base_url}?token=dad_super_password').status_code)
self.assertEqual(200, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd').status_code)
self.assertEqual(200, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=totonyus_super_password&presets=hd').status_code)
self.assertEqual(200, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio').status_code)
self.assertEqual(200, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=HD,AUDIO').status_code)
self.assertEqual(206, requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio,4k').status_code)
self.assertEqual(200, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd').status_code)
self.assertEqual(200, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=totonyus_super_password&presets=hd').status_code)
self.assertEqual(200, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio').status_code)
self.assertEqual(200, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=HD,AUDIO').status_code)
self.assertEqual(206, requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio,4k').status_code)
def test_download_info(self):
request_json = requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio,4k').json()
request_json = requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password&presets=hd,audio,4k').json()
self.assertTrue(request_json.get('all_downloads_checked'))
self.assertFalse(request_json.get('no_preset_found'))
@ -279,7 +338,8 @@ class TestAPI(unittest.TestCase):
self.assertEqual('HD', request_json.get('downloads')[0].get('_name'))
self.assertEqual('AUDIO', request_json.get('downloads')[1].get('_name'))
request_json = requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=fdsOWCK3413Wuk&token=dad_super_password&presets=4K,8K').json()
request_json = requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=fdsOWCK3413Wuk&token=dad_super_password&presets=4K,8K').json()
self.assertTrue(request_json.get('all_downloads_checked'))
self.assertTrue(request_json.get('no_preset_found'))
@ -289,7 +349,8 @@ class TestAPI(unittest.TestCase):
self.assertEqual(0, request_json.get('passed_checks'))
self.assertEqual('DEFAULT', request_json.get('downloads')[0].get('_name'))
request_json = requests.get(f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password').json()
request_json = requests.get(
f'{self.base_url}?url=https://www.youtube.com/watch?v=OWCK3413Wuk&token=dad_super_password').json()
self.assertTrue(request_json.get('all_downloads_checked'))
self.assertFalse(request_json.get('no_preset_found'))
@ -299,7 +360,8 @@ class TestAPI(unittest.TestCase):
self.assertEqual(1, request_json.get('passed_checks'))
self.assertEqual('DEFAULT', request_json.get('downloads')[0].get('_name'))
request_json = requests.get(f'{self.base_url}?url=https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&token=dad_super_password').json()
request_json = requests.get(
f'{self.base_url}?url=https://www.youtube.com/playlist?list=PL8Zccvo5Xlj53ESRIn2Q4lg2DvKIB92sj&token=dad_super_password').json()
self.assertFalse(request_json.get('all_downloads_checked'))
self.assertFalse(request_json.get('no_preset_found'))
@ -310,5 +372,382 @@ class TestAPI(unittest.TestCase):
self.assertEqual('DEFAULT', request_json.get('downloads')[0].get('_name'))
import programmation_manager
class TestProgrammation(unittest.TestCase):
database_file = 'test_database.json'
try:
os.remove(database_file)
except FileNotFoundError:
pass
pm = programmation_manager.ProgrammationManager(database_file=database_file)
def test_add_programmation(self):
### URL is invalid
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': None
}
))
self.assertEqual(1, len(validation_result))
self.assertIsNone(added_programmation)
### PRESET is wrong
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': 'a more valid_url',
'presets': "non valid presets"
}
))
self.assertEqual(1, len(validation_result))
self.assertIsNone(added_programmation)
### ALL is right
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': 'a more valid_url',
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
### The THREE DATES are wrong
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a more valid_url",
'planning': {
'recording_start_date': '2022-12-00 00:00',
'recurrence_start_date': '2022-12-00 00:00',
'recurrence_end_date': '2022-12-00 00:00',
},
}
))
self.assertEqual(3, len(validation_result))
self.assertIsNone(added_programmation)
### Everything is right
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a more valid_url",
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recurrence_start_date': '2022-12-01 00:00',
'recurrence_end_date': '2022-12-02 00:00',
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recording_duration': 'not valid',
},
}
))
self.assertEqual(1, len(validation_result))
self.assertIsNone(added_programmation)
### Known range date
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recurrence_cron': 'invalid'
},
}
))
self.assertEqual(1, len(validation_result))
self.assertIsNone(added_programmation)
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recording_start_date': '2022-12-01 10:00'
},
}
))
self.assertEqual(2, len(validation_result))
self.assertIsNone(added_programmation)
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a special test",
'planning': {
'recurrence_cron': '00 13 * * *',
'recurrence_start_date': None,
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
###
### Add useful tests cases
###
### Continuous downloading
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
### Known date
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
### Known date but disabled
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'id': '0',
'url': "a valid url",
'enabled': False,
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
self.assertFalse(added_programmation.get('enabled'))
self.assertNotEqual('0', added_programmation.get('id'))
### Known range date
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recording_duration': 60,
'recurrence_start_date': '2022-12-01 00:00',
'recurrence_end_date': '2022-12-31 23:59',
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
### Known range date
validation_result, added_programmation = self.pm.add_programmation(programmation=(
{
'url': "a valid url",
'planning': {
'recurrence_cron': '00 13 * * *',
'recording_duration': 60,
'recurrence_start_date': '2022-12-01 00:00',
'recurrence_end_date': '2022-12-31 23:59',
},
}
))
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(added_programmation)
self.assertTrue(added_programmation.get('enabled'))
self.last_added_id = added_programmation.get('id')
validation_result, changed_programmation = self.pm.update_programmation_by_id(
id=self.last_added_id,
programmation={
'enabled': False
})
self.assertEqual(0, len(validation_result))
self.assertIsNotNone(changed_programmation)
self.assertEqual('2022-12-01 13:00:00',
datetime.isoformat(self.pm.get_next_execution(programmation=changed_programmation,
from_date=datetime.fromisoformat(
'2022-11-30 12:00')), sep=' '))
validation_result, changed_programmation = self.pm.update_programmation_by_id(
id=self.last_added_id,
programmation={
'planning': {
'recurrence_start_date': '2022-12-00 12:00'
}
})
self.assertNotEqual(0, len(validation_result))
self.assertIsNone(changed_programmation)
### ACCESS
self.assertEqual(8, len(self.pm.get_all_programmations()))
self.assertEqual(6, len(self.pm.get_all_enabled_programmations()))
latest_created_programmation = self.pm.get_programmation_by_id(id=self.last_added_id)
self.assertIsNotNone(latest_created_programmation)
self.assertFalse(latest_created_programmation.get('enabled'))
self.assertIsNone(self.pm.get_programmation_by_id('0'))
self.assertIsNotNone(self.pm.delete_programmation_by_id(id=self.last_added_id))
self.assertEqual(6, len(self.pm.get_all_enabled_programmations()))
def test_end_dates_manipulation(self):
self.assertEqual('2022-12-01 02:00:00',
datetime.isoformat(
self.pm.get_end_date(programmation=self.pm.generate_programmation(programmation={
'url': "a valid url",
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
})), sep=' '))
self.assertEqual('2022-12-01 00:00:00',
datetime.isoformat(
self.pm.get_end_date(programmation=self.pm.generate_programmation(programmation={
'url': "a valid url",
'planning': {
'recurrence_end_date': '2022-12-01 00:00',
},
})), sep=' '))
self.assertEqual('2022-12-01 02:00:00',
datetime.isoformat(
self.pm.get_end_date(programmation=self.pm.generate_programmation(programmation={
'url': "a valid url",
'planning': {
'recurrence_end_date': '2022-12-01 00:00',
'recording_duration': 120,
},
})), sep=' '))
self.assertEqual('2022-12-31 12:15:00',
datetime.isoformat(
self.pm.get_end_date(programmation=self.pm.generate_programmation(programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '15 12 * * *',
'recurrence_start_date': '2022-12-30 00:00',
'recurrence_end_date': '2022-12-31 23:59',
},
})), sep=' '))
self.assertEqual('2022-12-31 13:15:00',
datetime.isoformat(
self.pm.get_end_date(programmation=self.pm.generate_programmation(programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '15 12 * * *',
'recording_duration': 60,
'recurrence_start_date': '2022-12-30 00:00',
'recurrence_end_date': '2022-12-31 23:59',
},
})), sep=' '))
self.assertEqual(3, len(self.pm.purge_all_past_programmations(
from_date=datetime.fromisoformat('2022-12-03 01:00'))))
self.assertEqual(4, len(self.pm.get_all_programmations()))
def test_restart(self):
self.assertFalse(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-01 12:00'),
programmation={
'url': "a valid url",
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}))
self.assertTrue(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-01 01:00'),
programmation={
'url': "a valid url",
'planning': {
'recording_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}))
self.assertFalse(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-01 15:00'),
programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recurrence_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}))
self.assertTrue(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-01 13:00'),
programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recurrence_start_date': '2022-12-01 00:00',
'recording_duration': 120,
},
}))
self.assertFalse(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-01 13:00'),
programmation={
'url': "a valid url",
'planning': {},
}))
self.assertFalse(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-02 15:00'),
programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recurrence_start_date': '2022-12-01 00:00',
'recurrence_end_date': '2022-12-02 23:00',
'recording_duration': 120,
},
}))
self.assertTrue(self.pm.must_be_restarted(
from_date=datetime.fromisoformat('2022-12-02 13:00'),
programmation={
'url': "a valid url",
'planning': {
'recurrence_cron': '00 12 * * *',
'recurrence_start_date': '2022-12-01 00:00',
'recurrence_end_date': '2022-12-02 23:00',
'recording_duration': 120,
},
}))
if __name__ == '__main__':
unittest.main()