ydl_api_ng/process_utils.py

582 lines
22 KiB
Python

import logging
import rq
from redis import Redis
from rq import Worker, Queue
from rq.command import send_kill_horse_command
from rq.job import Job
import os
import signal
import psutil
import re
import pathlib
import inspect
from mergedeep import merge
import download_manager
try:
from params import ydl_api_hooks
except ImportError as e:
logging.warning(e)
from setup import ydl_api_hooks
class ProcessUtils:
def __init__(self, config_manager, queue_name="ydl_api_ng"):
self.__cm = config_manager
self.queue_name = queue_name
if self.__cm.get_app_params().get('_enable_redis') is not None and self.__cm.get_app_params().get(
'_enable_redis') is True:
self.redis = Redis(host=self.__cm.get_app_params().get('_redis_host'),
port=self.__cm.get_app_params().get('_redis_port'))
self.queue = Queue(queue_name, connection=self.redis)
self.registries = {'pending_job': self.queue,
'started_job': self.queue.started_job_registry,
'finished_job': self.queue.finished_job_registry,
'failed_job': self.queue.failed_job_registry,
'deferred_job': self.queue.deferred_job_registry,
'scheduled_job': self.queue.scheduled_job_registry,
'canceled_job': self.queue.canceled_job_registry,
}
self.programmation_registries = {
'pending_job': self.queue,
'started_job': self.queue.started_job_registry,
}
else:
self.redis = None
self.queue = None
self.registries = None
def terminate_active_download(self, id, background_tasks=None):
if self.redis is None or (self.redis is not None and self.redis is False):
return self.terminate_basic_active_download(id, background_tasks=background_tasks)
else:
return self.terminate_redis_active_download(id, background_tasks=background_tasks)
def terminate_basic_active_download(self, pid, background_tasks=None):
child = self.get_child_object(int(pid))
if child is not None:
logging.getLogger('process_utils').info(f'PID {pid} will be terminated')
child.terminate()
filename_info = self.get_current_download_file_destination(child.cmdline())
if background_tasks is not None:
background_tasks.add_task(self.ffmpeg_terminated_file, filename_info=filename_info)
else:
self.ffmpeg_terminated_file(filename_info=filename_info)
if callable(getattr(ydl_api_hooks, 'post_termination_handler', None)):
ydl_api_hooks.post_termination_handler(self.__cm, filename_info)
return filename_info
else:
logging.getLogger('process_utils').error(f'PID {pid} does not exist or does not belong to the application')
return None
def find_ffmpeg_filename_info(self, job):
child_process = psutil.Process(job.get('worker').pid).children(recursive=True)
for process in child_process:
if process.name() == "ffmpeg":
info = self.get_current_download_file_destination(process.cmdline())
return process.pid, info
return None, None
def find_ffmpeg_filename_info_by_pid(self, pid):
child_process = psutil.Process(pid).children(recursive=True)
for process in child_process:
if process.name() == "ffmpeg":
info = self.get_current_download_file_destination(process.cmdline())
return process.pid, info
return None, None
def terminate_redis_download_by_programmation_id(self, programmation_id=None, *args, **kwargs):
found_job = self.find_job_by_programmation_id(programmation_id)
self.terminate_redis_active_download(found_job.get('id'))
def terminate_redis_active_download(self, search_job_id, background_tasks=None):
job = self.find_in_running(search_job_id)
if job is not None:
ffmpeg_killed = False
process_pid, filename_info = self.find_ffmpeg_filename_info(job)
job_object = {
'id': job.get('id'),
'preset': job.get('preset'),
'download_manager': job.get('download_manager'),
'worker': job.get('worker'),
'job': job.get('job'),
}
if filename_info is not None:
ffmpeg_killed = True
os.kill(process_pid, signal.SIGINT)
try:
job.get('job').meta['filename_info'] = filename_info
job.get('job').meta['terminated'] = True
job.get('job').save_meta()
if background_tasks is not None:
background_tasks.add_task(self.ffmpeg_terminated_file, filename_info=filename_info)
else:
self.ffmpeg_terminated_file(filename_info=filename_info)
if callable(getattr(ydl_api_hooks, 'post_redis_termination_handler', None)):
ydl_api_hooks.post_redis_termination_handler(job_object.get('download_manager'), filename_info)
except FileNotFoundError as e:
logging.getLogger('process_utils').error(e)
pass
logging.getLogger('process_utils').info(f'ffmpeg process killed : {process_pid}')
if not ffmpeg_killed:
send_kill_horse_command(self.redis, job.get('worker').name)
if callable(getattr(ydl_api_hooks, 'post_redis_termination_handler', None)):
ydl_api_hooks.post_redis_termination_handler(job_object.get('download_manager'), None)
logging.getLogger('process_utils').info(f"Job stopped on worker {job.get('worker').name}")
if inspect.getfullargspec(ydl_api_hooks.post_download_handler).varkw is not None:
ydl_api_hooks.post_download_handler(job.get('preset'), job.get('download_manager'),
job.get('download_manager').get_current_config_manager(),
job.get('job').meta.get('downloaded_files'), job=job)
else:
ydl_api_hooks.post_download_handler(job.get('preset'), job.get('download_manager'),
job.get('download_manager').get_current_config_manager(),
job.get('job').meta.get('downloaded_files'))
return self.sanitize_job(job_object)
else:
job = self.find_job_by_id(search_job_id)
if job is None:
return None
try:
job.get('job').cancel()
job.get('job').delete()
except rq.exceptions.InvalidJobOperation:
job.get('job').delete()
return self.sanitize_job(job)
def terminate_all_active_downloads(self, background_tasks=None):
if self.redis is None:
return self.terminate_all_basic_active_downloads(background_tasks=background_tasks)
else:
return self.terminate_all_redis_active_downloads(background_tasks=background_tasks)
def terminate_all_basic_active_downloads(self, background_tasks=None):
logging.getLogger('process_utils').info('All active downloads are being terminated')
informations = []
for download in self.get_active_downloads_list():
pid = download.get('pid')
informations.append(self.terminate_active_download(pid, background_tasks=background_tasks))
return informations
def terminate_all_redis_active_downloads(self, background_tasks=None):
stopped = []
for worker in self.get_workers_info():
if worker.get('job') is not None:
job = worker.get('job').get('job')
stopped.append(self.terminate_redis_active_download(job.id, background_tasks=background_tasks))
return stopped
def get_active_downloads_list(self):
if self.redis is None:
return self.get_basic_active_downloads_list()
else:
return self.get_redis_active_downloads_list()
def get_basic_active_downloads_list(self):
children_process = psutil.Process().children(recursive=True)
active_downloads_list = []
for child in children_process:
active_download = {
'command_line': f'{child.cmdline()}',
'filename': self.get_current_download_file_destination(child.cmdline()),
'pid': child.pid,
'create_time': child.create_time()
}
active_downloads_list.append(active_download)
return active_downloads_list
def sanitize_registry(self, registry):
sanitized = []
jobs = self.get_jobs_from_registry(registry)
for job in jobs:
sanitized_job = self.sanitize_job(job)
sanitized.append(sanitized_job)
return sanitized
def sanitize_job(self, job):
return {
'id': job.get('id'),
'preset': self.__cm.sanitize_config_object_section(job.get('preset')).get_all(),
'download_manager': job.get('download_manager').get_api_return_object(),
'job': self.sanitize_job_object(job.get('job')),
}
def sanitize_job_object(self, job):
if job is None:
return None
try:
sanitize_object = {
'status': job.get_status(refresh=True),
'result': job.result,
'queue_name': job.origin,
'result_ttl': job.result_ttl,
'redis_id' : job.id,
'enqueued_at': job.enqueued_at,
'started_at': job.started_at,
'ended_at': job.ended_at,
'exc_info': job.exc_info,
'last_heartbeat': job.last_heartbeat,
'worker_name': job.worker_name,
'meta': job.meta
}
except :
return None
return sanitize_object
def sanitize_workers_list(self, workers):
sanitized_workers = []
for worker in workers:
sanitizer_worker = {
'name': worker.get('name'),
'hostname': worker.get('hostname'),
'pid': worker.get('pid'),
'state': worker.get('state'),
'last_heartbeat': worker.get('last_heartbeat'),
'birth_date': worker.get('birth_date'),
'successful_job_count': worker.get('successful_job_count'),
'failed_job_count': worker.get('failed_job_count'),
'total_working_time': worker.get('total_working_time'),
'job': None,
}
current_job = worker.get('current_job')
if current_job is not None:
sanitizer_worker['job'] = self.sanitize_job({
'id': current_job.id,
'preset': current_job.args[0],
'download_manager': current_job.args[1],
'job': current_job
})
sanitized_workers.append(sanitizer_worker)
return sanitized_workers
def get_redis_active_downloads_list(self):
return {
'started_job': self.sanitize_registry('started_job'),
'pending_job': self.sanitize_registry('pending_job')
}
def is_a_child_process(self, pid):
children_process = psutil.Process().children(recursive=True)
is_child = False
for child in children_process:
if child.pid == pid:
is_child = True
return is_child
def get_child_object(self, pid):
children_process = psutil.Process().children(recursive=True)
for child in children_process:
if child.pid == pid:
return child
return None
def get_jobs_from_registry(self, registry):
jobs = []
for job_id in self.registries.get(registry).get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis)
jobs.append({
'id': job.id,
'registry': registry,
'preset': job.args[0],
'download_manager': job.args[1],
'job': job
})
except rq.exceptions.NoSuchJobError:
pass
return jobs
def clear_registry(self, registry):
cleared_jobs_ids = []
for job_id in self.registries.get(registry).get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis)
cleared_jobs_ids.append(job.id)
job.cancel()
except rq.exceptions.InvalidJobOperation:
job.delete()
except rq.exceptions.NoSuchJobError:
pass
return cleared_jobs_ids
def clear_all_but_pending_and_started(self):
self.clear_registry('finished_job')
self.clear_registry('failed_job')
self.clear_registry('deferred_job')
self.clear_registry('scheduled_job')
self.clear_registry('canceled_job')
def get_all_jobs(self):
jobs = {}
for registry in self.registries:
jobs[registry] = self.get_jobs_from_registry(registry)
return jobs
def get_workers_info(self):
workers = []
for worker in Worker.all(queue=self.queue):
worker_object = {
'name': worker.name,
'hostname': worker.hostname,
'pid': worker.pid,
'queues': worker.queues,
'state': worker.state,
'current_job': worker.get_current_job(),
'last_heartbeat': worker.last_heartbeat,
'birth_date': worker.birth_date,
'successful_job_count': worker.successful_job_count,
'failed_job_count': worker.failed_job_count,
'total_working_time': worker.total_working_time,
'worker': worker
}
current_job = worker_object.get('current_job')
if current_job is not None:
worker_object['job'] = {
'id': current_job.id,
'preset': current_job.args[0],
'download_manager': current_job.args[1],
'job': current_job
}
pid, current_job.meta['filename_info'] = self.find_ffmpeg_filename_info_by_pid(worker.pid)
workers.append(worker_object)
return workers
def find_job_by_id(self, searched_job_id):
for registry in self.registries:
for job_id in self.registries.get(registry).get_job_ids():
if job_id == searched_job_id:
try:
job = Job.fetch(job_id, connection=self.redis)
return {
'id': job.id,
'registry': registry,
'preset': job.args[0],
'download_manager': job.args[1],
'job': job
}
except rq.exceptions.NoSuchJobError:
return None
return None
def find_job_with_programmation_end_date(self):
jobs = []
for registry in ["pending_job", "started_job"]:
for job_id in self.registries.get(registry).get_job_ids():
try:
job = Job.fetch(job_id, connection=self.redis)
if job.meta.get('programmation_end_date') is not None:
jobs.append({
'id': job.id,
'registry': registry,
'preset': job.args[0],
'download_manager': job.args[1],
'job': job
})
except rq.exceptions.NoSuchJobError:
return None
return jobs
def find_job_by_programmation_id(self, programmation_id=None, *args, **kwargs):
for registry, content in self.programmation_registries.items():
for job_id in content.get_job_ids():
job = Job.fetch(job_id, connection=self.redis)
if job.meta.get('programmation_id') == programmation_id:
return {
'id': job.id,
'registry': registry,
'preset': job.args[0],
'download_manager': job.args[1],
'job': job
}
return None
def find_in_running(self, search_job_id):
for worker in Worker.all(queue=self.queue):
current_job = worker.get_current_job()
if current_job is not None and current_job.id == search_job_id:
return {
'id': current_job.id,
'preset': current_job.args[0],
'download_manager': current_job.args[1],
'worker': worker,
'job': current_job
}
return None
def get_current_download_file_destination(self, cmdline):
regex = r'\'file:(.*\/)(.*)\''
match = re.search(regex, f'{cmdline}')
part_filename = f'{match.group(1)}{match.group(2)}'
filename = part_filename.removesuffix('.part')
path = match.group(1)
filename_stem = pathlib.Path(filename).stem
extension = pathlib.Path(filename).suffix
try:
file_size = os.path.getsize(part_filename)
except FileNotFoundError:
try:
file_size = os.path.getsize(filename)
except FileNotFoundError:
file_size = 0
return {
'part_filename': part_filename,
'filename': filename,
'path': path,
'filename_stem': filename_stem,
'extension': extension,
'full_filename': f'{filename_stem}{extension}',
'file_size': file_size
}
def get_queue_content(self, registry):
sanitized_jobs = {}
if registry == 'all':
registries = self.get_all_jobs()
elif registry == 'workers':
registries = self.sanitize_workers_list(self.get_workers_info())
return registries
elif self.registries.get(registry) is None:
return None
else:
registries = {registry: self.get_jobs_from_registry(registry)}
for r in registries:
sanitized_jobs[r] = self.sanitize_registry(r)
return sanitized_jobs
def relaunch_failed(self, job_id, user_token=None):
try:
job = Job.fetch(job_id, connection=self.redis)
except rq.exceptions.NoSuchJobError:
return 404, None
preset = job.args[0].get_all()
preset['ignoreerrors'] = False
downloads_state = {}
launched_downloads = []
downloaded_files = job.meta.get('downloaded_files')
if downloaded_files is not None:
downloads_state = download_manager.DownloadManager.get_downloaded_files_info(
job.meta.get('downloaded_files'))
for video_id in downloads_state:
download_object = downloads_state.get(video_id)
if download_object.get('error_downloads') > 0:
url = download_object.get('downloads')[0].get('info_dict').get('webpage_url')
dm = download_manager.DownloadManager(self.__cm, url, None, user_token, {'presets': [preset]},
ignore_post_security=True, relaunch_failed_mode=True)
dm.process_downloads()
launched_downloads.append(dm.get_api_return_object())
return 200, launched_downloads
def relaunch_job(self, job_id, user_token):
try:
job = Job.fetch(job_id, connection=self.redis)
except rq.exceptions.NoSuchJobError:
return 404, None
preset = job.args[0].get_all()
dm = job.args[1]
dm = download_manager.DownloadManager(self.__cm, dm.url, None, user_token, {'presets': [preset]},
ignore_post_security=True, programmation_id=dm.programmation_id)
if dm.get_api_status_code() != 400:
dm.process_downloads()
return dm.get_api_status_code(), dm.get_api_return_object()
def ffmpeg_terminated_file(self, filename_info=None, *args, **kwargs):
try:
os.rename(filename_info.get('part_filename'), filename_info.get('filename'))
except Exception as e:
logging.getLogger('process_utils').error(f'{filename_info.get("part_filename")} : {e}')
return filename_info
def update_active_download_metadata(self, id=None, metadata=None, *args, **kwargs):
found_job = self.find_in_running(search_job_id=id)
if found_job is None:
return None
try:
job = Job.fetch(found_job.get('id'), connection=self.redis)
except rq.exceptions.NoSuchJobError:
return None
job.meta = merge(job.meta, metadata)
job.save_meta()
return self.sanitize_job(self.find_job_by_id(searched_job_id=id))