579 lines
21 KiB
Python
579 lines
21 KiB
Python
import logging
|
|
import rq
|
|
from redis import Redis
|
|
from rq import Worker, Queue
|
|
from rq.command import send_kill_horse_command
|
|
from rq.job import Job
|
|
import os
|
|
import signal
|
|
import psutil
|
|
import re
|
|
import pathlib
|
|
import inspect
|
|
from mergedeep import merge
|
|
|
|
import download_manager
|
|
|
|
try:
|
|
from params import ydl_api_hooks
|
|
except ImportError as e:
|
|
logging.warning(e)
|
|
from setup import ydl_api_hooks
|
|
|
|
class ProcessUtils:
|
|
def __init__(self, config_manager, queue_name="ydl_api_ng"):
|
|
self.__cm = config_manager
|
|
self.queue_name = queue_name
|
|
|
|
if self.__cm.get_app_params().get('_enable_redis') is not None and self.__cm.get_app_params().get(
|
|
'_enable_redis') is True:
|
|
self.redis = Redis(host=self.__cm.get_app_params().get('_redis_host'),
|
|
port=self.__cm.get_app_params().get('_redis_port'))
|
|
self.queue = Queue(queue_name, connection=self.redis)
|
|
self.registries = {'pending_job': self.queue,
|
|
'started_job': self.queue.started_job_registry,
|
|
'finished_job': self.queue.finished_job_registry,
|
|
'failed_job': self.queue.failed_job_registry,
|
|
'deferred_job': self.queue.deferred_job_registry,
|
|
'scheduled_job': self.queue.scheduled_job_registry,
|
|
'canceled_job': self.queue.canceled_job_registry,
|
|
}
|
|
self.programmation_registries = {
|
|
'pending_job': self.queue,
|
|
'started_job': self.queue.started_job_registry,
|
|
}
|
|
|
|
else:
|
|
self.redis = None
|
|
self.queue = None
|
|
self.registries = None
|
|
|
|
def terminate_active_download(self, id, background_tasks=None):
|
|
if self.redis is None or (self.redis is not None and self.redis is False):
|
|
return self.terminate_basic_active_download(id, background_tasks=background_tasks)
|
|
else:
|
|
return self.terminate_redis_active_download(id, background_tasks=background_tasks)
|
|
|
|
def terminate_basic_active_download(self, pid, background_tasks=None):
|
|
child = self.get_child_object(int(pid))
|
|
|
|
if child is not None:
|
|
logging.getLogger('process_utils').info(f'PID {pid} will be terminated')
|
|
|
|
child.terminate()
|
|
filename_info = self.get_current_download_file_destination(child.cmdline())
|
|
|
|
if background_tasks is not None:
|
|
background_tasks.add_task(self.ffmpeg_terminated_file, filename_info=filename_info)
|
|
else:
|
|
self.ffmpeg_terminated_file(filename_info=filename_info)
|
|
|
|
if callable(getattr(ydl_api_hooks, 'post_termination_handler', None)):
|
|
ydl_api_hooks.post_termination_handler(self.__cm, filename_info)
|
|
|
|
return filename_info
|
|
else:
|
|
logging.getLogger('process_utils').error(f'PID {pid} does not exist or does not belong to the application')
|
|
|
|
return None
|
|
|
|
def find_ffmpeg_filename_info(self, job):
|
|
child_process = psutil.Process(job.get('worker').pid).children(recursive=True)
|
|
|
|
for process in child_process:
|
|
if process.name() == "ffmpeg":
|
|
info = self.get_current_download_file_destination(process.cmdline())
|
|
return process.pid, info
|
|
|
|
return None, None
|
|
|
|
def find_ffmpeg_filename_info_by_pid(self, pid):
|
|
child_process = psutil.Process(pid).children(recursive=True)
|
|
|
|
for process in child_process:
|
|
if process.name() == "ffmpeg":
|
|
info = self.get_current_download_file_destination(process.cmdline())
|
|
return process.pid, info
|
|
|
|
return None, None
|
|
|
|
def terminate_redis_download_by_programmation_id(self, programmation_id=None, *args, **kwargs):
|
|
found_job = self.find_job_by_programmation_id(programmation_id)
|
|
self.terminate_redis_active_download(found_job.get('id'))
|
|
|
|
def terminate_redis_active_download(self, search_job_id, background_tasks=None):
|
|
job = self.find_in_running(search_job_id)
|
|
|
|
if job is not None:
|
|
ffmpeg_killed = False
|
|
process_pid, filename_info = self.find_ffmpeg_filename_info(job)
|
|
|
|
job_object = {
|
|
'id': job.get('id'),
|
|
'preset': job.get('preset'),
|
|
'download_manager': job.get('download_manager'),
|
|
'worker': job.get('worker'),
|
|
'job': job.get('job'),
|
|
}
|
|
|
|
if filename_info is not None:
|
|
ffmpeg_killed = True
|
|
os.kill(process_pid, signal.SIGINT)
|
|
|
|
try:
|
|
job.get('job').meta['filename_info'] = filename_info
|
|
job.get('job').meta['terminated'] = True
|
|
job.get('job').save_meta()
|
|
|
|
if background_tasks is not None:
|
|
background_tasks.add_task(self.ffmpeg_terminated_file, filename_info=filename_info)
|
|
else:
|
|
self.ffmpeg_terminated_file(filename_info=filename_info)
|
|
|
|
if callable(getattr(ydl_api_hooks, 'post_redis_termination_handler', None)):
|
|
ydl_api_hooks.post_redis_termination_handler(job_object.get('download_manager'), filename_info)
|
|
|
|
|
|
except FileNotFoundError as e:
|
|
logging.getLogger('process_utils').error(e)
|
|
pass
|
|
|
|
logging.getLogger('process_utils').info(f'ffmpeg process killed : {process_pid}')
|
|
|
|
if not ffmpeg_killed:
|
|
send_kill_horse_command(self.redis, job.get('worker').name)
|
|
|
|
if callable(getattr(ydl_api_hooks, 'post_redis_termination_handler', None)):
|
|
ydl_api_hooks.post_redis_termination_handler(job_object.get('download_manager'), None)
|
|
|
|
logging.getLogger('process_utils').info(f"Job stopped on worker {job.get('worker').name}")
|
|
|
|
if inspect.getfullargspec(ydl_api_hooks.post_download_handler).varkw is not None:
|
|
ydl_api_hooks.post_download_handler(job.get('preset'), job.get('download_manager'),
|
|
job.get('download_manager').get_current_config_manager(),
|
|
job.get('job').meta.get('downloaded_files'), job=job)
|
|
else:
|
|
ydl_api_hooks.post_download_handler(job.get('preset'), job.get('download_manager'),
|
|
job.get('download_manager').get_current_config_manager(),
|
|
job.get('job').meta.get('downloaded_files'))
|
|
|
|
return self.sanitize_job(job_object)
|
|
else:
|
|
job = self.find_job_by_id(search_job_id)
|
|
|
|
if job is None:
|
|
return None
|
|
|
|
try:
|
|
job.get('job').cancel()
|
|
job.get('job').delete()
|
|
except rq.exceptions.InvalidJobOperation:
|
|
job.get('job').delete()
|
|
|
|
return self.sanitize_job(job)
|
|
|
|
def terminate_all_active_downloads(self, background_tasks=None):
|
|
if self.redis is None:
|
|
return self.terminate_all_basic_active_downloads(background_tasks=background_tasks)
|
|
else:
|
|
return self.terminate_all_redis_active_downloads(background_tasks=background_tasks)
|
|
|
|
def terminate_all_basic_active_downloads(self, background_tasks=None):
|
|
logging.getLogger('process_utils').info('All active downloads are being terminated')
|
|
|
|
informations = []
|
|
for download in self.get_active_downloads_list():
|
|
pid = download.get('pid')
|
|
informations.append(self.terminate_active_download(pid, background_tasks=background_tasks))
|
|
|
|
return informations
|
|
|
|
def terminate_all_redis_active_downloads(self, background_tasks=None):
|
|
stopped = []
|
|
for worker in self.get_workers_info():
|
|
if worker.get('job') is not None:
|
|
job = worker.get('job').get('job')
|
|
stopped.append(self.terminate_redis_active_download(job.id, background_tasks=background_tasks))
|
|
return stopped
|
|
|
|
def get_active_downloads_list(self):
|
|
if self.redis is None:
|
|
return self.get_basic_active_downloads_list()
|
|
else:
|
|
return self.get_redis_active_downloads_list()
|
|
|
|
def get_basic_active_downloads_list(self):
|
|
children_process = psutil.Process().children(recursive=True)
|
|
|
|
active_downloads_list = []
|
|
for child in children_process:
|
|
active_download = {
|
|
'command_line': f'{child.cmdline()}',
|
|
'filename': self.get_current_download_file_destination(child.cmdline()),
|
|
'pid': child.pid,
|
|
'create_time': child.create_time()
|
|
}
|
|
|
|
active_downloads_list.append(active_download)
|
|
|
|
return active_downloads_list
|
|
|
|
def sanitize_registry(self, registry):
|
|
sanitized = []
|
|
jobs = self.get_jobs_from_registry(registry)
|
|
|
|
for job in jobs:
|
|
sanitized_job = self.sanitize_job(job)
|
|
sanitized.append(sanitized_job)
|
|
|
|
return sanitized
|
|
|
|
def sanitize_job(self, job):
|
|
return {
|
|
'id': job.get('id'),
|
|
'preset': self.__cm.sanitize_config_object_section(job.get('preset')).get_all(),
|
|
'download_manager': job.get('download_manager').get_api_return_object(),
|
|
'job': self.sanitize_job_object(job.get('job')),
|
|
}
|
|
|
|
def sanitize_job_object(self, job):
|
|
if job is None:
|
|
return None
|
|
|
|
sanitize_object = {
|
|
'status': job.get_status(refresh=True),
|
|
'result': job.result,
|
|
'queue_name': job.origin,
|
|
'result_ttl': job.result_ttl,
|
|
'redis_id' : job.id,
|
|
'enqueued_at': job.enqueued_at,
|
|
'started_at': job.started_at,
|
|
'ended_at': job.ended_at,
|
|
'exc_info': job.exc_info,
|
|
'last_heartbeat': job.last_heartbeat,
|
|
'worker_name': job.worker_name,
|
|
'meta': job.meta
|
|
}
|
|
|
|
return sanitize_object
|
|
|
|
def sanitize_workers_list(self, workers):
|
|
sanitized_workers = []
|
|
for worker in workers:
|
|
sanitizer_worker = {
|
|
'name': worker.get('name'),
|
|
'hostname': worker.get('hostname'),
|
|
'pid': worker.get('pid'),
|
|
'state': worker.get('state'),
|
|
'last_heartbeat': worker.get('last_heartbeat'),
|
|
'birth_date': worker.get('birth_date'),
|
|
'successful_job_count': worker.get('successful_job_count'),
|
|
'failed_job_count': worker.get('failed_job_count'),
|
|
'total_working_time': worker.get('total_working_time'),
|
|
'job': None,
|
|
}
|
|
|
|
current_job = worker.get('current_job')
|
|
if current_job is not None:
|
|
sanitizer_worker['job'] = self.sanitize_job({
|
|
'id': current_job.id,
|
|
'preset': current_job.args[0],
|
|
'download_manager': current_job.args[1],
|
|
'job': current_job
|
|
})
|
|
|
|
sanitized_workers.append(sanitizer_worker)
|
|
|
|
return sanitized_workers
|
|
|
|
def get_redis_active_downloads_list(self):
|
|
return {
|
|
'started_job': self.sanitize_registry('started_job'),
|
|
'pending_job': self.sanitize_registry('pending_job')
|
|
}
|
|
|
|
def is_a_child_process(self, pid):
|
|
children_process = psutil.Process().children(recursive=True)
|
|
|
|
is_child = False
|
|
|
|
for child in children_process:
|
|
if child.pid == pid:
|
|
is_child = True
|
|
|
|
return is_child
|
|
|
|
def get_child_object(self, pid):
|
|
children_process = psutil.Process().children(recursive=True)
|
|
|
|
for child in children_process:
|
|
if child.pid == pid:
|
|
return child
|
|
|
|
return None
|
|
|
|
def get_jobs_from_registry(self, registry):
|
|
jobs = []
|
|
|
|
for job_id in self.registries.get(registry).get_job_ids():
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
|
|
jobs.append({
|
|
'id': job.id,
|
|
'registry': registry,
|
|
'preset': job.args[0],
|
|
'download_manager': job.args[1],
|
|
'job': job
|
|
})
|
|
except rq.exceptions.NoSuchJobError:
|
|
pass
|
|
|
|
return jobs
|
|
|
|
def clear_registry(self, registry):
|
|
cleared_jobs_ids = []
|
|
|
|
for job_id in self.registries.get(registry).get_job_ids():
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
cleared_jobs_ids.append(job.id)
|
|
|
|
job.cancel()
|
|
except rq.exceptions.InvalidJobOperation:
|
|
job.delete()
|
|
except rq.exceptions.NoSuchJobError:
|
|
pass
|
|
|
|
return cleared_jobs_ids
|
|
|
|
def clear_all_but_pending_and_started(self):
|
|
self.clear_registry('finished_job')
|
|
self.clear_registry('failed_job')
|
|
self.clear_registry('deferred_job')
|
|
self.clear_registry('scheduled_job')
|
|
self.clear_registry('canceled_job')
|
|
|
|
def get_all_jobs(self):
|
|
jobs = {}
|
|
for registry in self.registries:
|
|
jobs[registry] = self.get_jobs_from_registry(registry)
|
|
|
|
return jobs
|
|
|
|
def get_workers_info(self):
|
|
workers = []
|
|
for worker in Worker.all(queue=self.queue):
|
|
worker_object = {
|
|
'name': worker.name,
|
|
'hostname': worker.hostname,
|
|
'pid': worker.pid,
|
|
'queues': worker.queues,
|
|
'state': worker.state,
|
|
'current_job': worker.get_current_job(),
|
|
'last_heartbeat': worker.last_heartbeat,
|
|
'birth_date': worker.birth_date,
|
|
'successful_job_count': worker.successful_job_count,
|
|
'failed_job_count': worker.failed_job_count,
|
|
'total_working_time': worker.total_working_time,
|
|
'worker': worker
|
|
}
|
|
|
|
current_job = worker_object.get('current_job')
|
|
if current_job is not None:
|
|
worker_object['job'] = {
|
|
'id': current_job.id,
|
|
'preset': current_job.args[0],
|
|
'download_manager': current_job.args[1],
|
|
'job': current_job
|
|
}
|
|
|
|
pid, current_job.meta['filename_info'] = self.find_ffmpeg_filename_info_by_pid(worker.pid)
|
|
|
|
workers.append(worker_object)
|
|
|
|
return workers
|
|
|
|
def find_job_by_id(self, searched_job_id):
|
|
for registry in self.registries:
|
|
for job_id in self.registries.get(registry).get_job_ids():
|
|
if job_id == searched_job_id:
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
return {
|
|
'id': job.id,
|
|
'registry': registry,
|
|
'preset': job.args[0],
|
|
'download_manager': job.args[1],
|
|
'job': job
|
|
}
|
|
except rq.exceptions.NoSuchJobError:
|
|
return None
|
|
return None
|
|
|
|
def find_job_with_programmation_end_date(self):
|
|
jobs = []
|
|
for registry in ["pending_job", "started_job"]:
|
|
for job_id in self.registries.get(registry).get_job_ids():
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
|
|
if job.meta.get('programmation_end_date') is not None:
|
|
jobs.append({
|
|
'id': job.id,
|
|
'registry': registry,
|
|
'preset': job.args[0],
|
|
'download_manager': job.args[1],
|
|
'job': job
|
|
})
|
|
except rq.exceptions.NoSuchJobError:
|
|
return None
|
|
return jobs
|
|
|
|
def find_job_by_programmation_id(self, programmation_id=None, *args, **kwargs):
|
|
for registry, content in self.programmation_registries.items():
|
|
for job_id in content.get_job_ids():
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
|
|
if job.meta.get('programmation_id') == programmation_id:
|
|
return {
|
|
'id': job.id,
|
|
'registry': registry,
|
|
'preset': job.args[0],
|
|
'download_manager': job.args[1],
|
|
'job': job
|
|
}
|
|
return None
|
|
|
|
def find_in_running(self, search_job_id):
|
|
for worker in Worker.all(queue=self.queue):
|
|
current_job = worker.get_current_job()
|
|
if current_job is not None and current_job.id == search_job_id:
|
|
return {
|
|
'id': current_job.id,
|
|
'preset': current_job.args[0],
|
|
'download_manager': current_job.args[1],
|
|
'worker': worker,
|
|
'job': current_job
|
|
}
|
|
return None
|
|
|
|
def get_current_download_file_destination(self, cmdline):
|
|
regex = r'\'file:(.*\/)(.*)\''
|
|
|
|
match = re.search(regex, f'{cmdline}')
|
|
|
|
part_filename = f'{match.group(1)}{match.group(2)}'
|
|
filename = part_filename.removesuffix('.part')
|
|
|
|
path = match.group(1)
|
|
filename_stem = pathlib.Path(filename).stem
|
|
extension = pathlib.Path(filename).suffix
|
|
|
|
try:
|
|
file_size = os.path.getsize(part_filename)
|
|
except FileNotFoundError:
|
|
try:
|
|
file_size = os.path.getsize(filename)
|
|
except FileNotFoundError:
|
|
file_size = 0
|
|
|
|
return {
|
|
'part_filename': part_filename,
|
|
'filename': filename,
|
|
'path': path,
|
|
'filename_stem': filename_stem,
|
|
'extension': extension,
|
|
'full_filename': f'{filename_stem}{extension}',
|
|
'file_size': file_size
|
|
}
|
|
|
|
def get_queue_content(self, registry):
|
|
sanitized_jobs = {}
|
|
|
|
if registry == 'all':
|
|
registries = self.get_all_jobs()
|
|
elif registry == 'workers':
|
|
registries = self.sanitize_workers_list(self.get_workers_info())
|
|
return registries
|
|
elif self.registries.get(registry) is None:
|
|
return None
|
|
else:
|
|
registries = {registry: self.get_jobs_from_registry(registry)}
|
|
|
|
for r in registries:
|
|
sanitized_jobs[r] = self.sanitize_registry(r)
|
|
|
|
return sanitized_jobs
|
|
|
|
def relaunch_failed(self, job_id, user_token=None):
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
except rq.exceptions.NoSuchJobError:
|
|
return 404, None
|
|
|
|
preset = job.args[0].get_all()
|
|
preset['ignoreerrors'] = False
|
|
|
|
downloads_state = {}
|
|
launched_downloads = []
|
|
|
|
downloaded_files = job.meta.get('downloaded_files')
|
|
if downloaded_files is not None:
|
|
downloads_state = download_manager.DownloadManager.get_downloaded_files_info(
|
|
job.meta.get('downloaded_files'))
|
|
|
|
for video_id in downloads_state:
|
|
download_object = downloads_state.get(video_id)
|
|
if download_object.get('error_downloads') > 0:
|
|
url = download_object.get('downloads')[0].get('info_dict').get('webpage_url')
|
|
|
|
dm = download_manager.DownloadManager(self.__cm, url, None, user_token, {'presets': [preset]},
|
|
ignore_post_security=True, relaunch_failed_mode=True)
|
|
dm.process_downloads()
|
|
|
|
launched_downloads.append(dm.get_api_return_object())
|
|
|
|
return 200, launched_downloads
|
|
|
|
def relaunch_job(self, job_id, user_token):
|
|
try:
|
|
job = Job.fetch(job_id, connection=self.redis)
|
|
except rq.exceptions.NoSuchJobError:
|
|
return 404, None
|
|
|
|
preset = job.args[0].get_all()
|
|
dm = job.args[1]
|
|
|
|
dm = download_manager.DownloadManager(self.__cm, dm.url, None, user_token, {'presets': [preset]},
|
|
ignore_post_security=True, programmation_id=dm.programmation_id)
|
|
|
|
if dm.get_api_status_code() != 400:
|
|
dm.process_downloads()
|
|
|
|
return dm.get_api_status_code(), dm.get_api_return_object()
|
|
|
|
def ffmpeg_terminated_file(self, filename_info=None, *args, **kwargs):
|
|
try:
|
|
os.rename(filename_info.get('part_filename'), filename_info.get('filename'))
|
|
except Exception as e:
|
|
logging.getLogger('process_utils').error(f'{filename_info.get("part_filename")} : {e}')
|
|
|
|
return filename_info
|
|
|
|
def update_active_download_metadata(self, id=None, metadata=None, *args, **kwargs):
|
|
found_job = self.find_in_running(search_job_id=id)
|
|
|
|
if found_job is None:
|
|
return None
|
|
|
|
try:
|
|
job = Job.fetch(found_job.get('id'), connection=self.redis)
|
|
except rq.exceptions.NoSuchJobError:
|
|
return None
|
|
|
|
job.meta = merge(job.meta, metadata)
|
|
job.save_meta()
|
|
|
|
return self.sanitize_job(self.find_job_by_id(searched_job_id=id))
|