Мои скрипты

Материал из wolfram
Перейти к навигации Перейти к поиску

Тут я буду аккумулировать мои наработки и автоматизации, в данном случае на python.


Автоматизация выравнивания громкости звуковых дорожек.

Что это из себя представляет. Представим, что мы скачали сериала или фильм, с закадровым переводом. Так как закадровый перевод всегда поверх оригинальной дорожки, получается, что оригинальная дорожка, простыми словами становится тише.

Есть студии которые заглушают дорожку только в местах где идет пере озвучка. Я не разу не встречал случаев когда кто то бы специально заглушал оригинальную озвучку чтобы не снижать громкость остальных звуков. Всё же живём в век технологий.

Для это существует множество методов, но самый популярный и мощный, основании конечно же на ИИ, это


YT-DLP Downloader Script

Простой и мощный Python-скрипт для автоматизации скачивания видео с помощью утилиты yt-dlp. Скрипт автоматически определяет наилучший доступный HLS-поток и загружает его в несколько потоков для максимальной скорости.

Назначение

Скрипт предназначен для упрощения процесса загрузки видео. Его ключевые возможности:

  • Автоматический выбор качества: Самостоятельно находит HLS-поток с самым высоким разрешением.
  • Многопоточная загрузка: Ускоряет скачивание, используя несколько потоков одновременно.
  • Пакетная обработка: Может обрабатывать список ссылок из файла links.txt.
  • Гибкая настройка: Позволяет через аргументы командной строки указать папку для сохранения, количество потоков и режим постобработки.
  • Опциональная постобработка: По умолчанию отключен вызов ffmpeg для исправления контейнера, что ускоряет процесс, но его можно включить при необходимости.

Установка

Перед использованием скрипта убедитесь, что в вашей системе установлены необходимые компоненты.

  1. Python: Требуется Python версии 3.6 или выше.
  2. yt-dlp: Основная утилита для скачивания. Установить можно через pip:
  3. pip install yt-dlp
  4. FFmpeg (Опционально): Необходим, если вы планируете использовать функцию исправления видеофайлов (аргумент --use-fixup).
  5. Сам скрипт: Сохраните исходный код в файл с именем downloader.py.

Использование

Скрипт запускается из командной строки. Основной способ работы — через аргументы.

Источник ссылок

Скрипт сначала ищет файл links.txt в той же директории.

  • Если файл найден и не пуст, скрипт обработает все ссылки из него по очереди. Каждая ссылка должна быть на новой строке.
  • Если файл пуст или отсутствует, скрипт запросит ввести одну ссылку для скачивания в консоли.

Аргументы командной строки

Вы можете настроить работу скрипта с помощью следующих аргументов:

  • -h, --help: Показать справочное сообщение и выйти.
  • -p PATH, --path PATH: Указать путь к папке для сохранения файлов. Если не указан, файлы сохраняются рядом со скриптом.
  • -t THREADS, --threads THREADS: Задать количество потоков для скачивания. По умолчанию: 6.
  • --use-fixup: Включить постобработку видео с помощью ffmpeg. По умолчанию отключена.

Примеры команд

  • Простой запуск (6 потоков, сохранение в текущую папку, без постобработки):
python downloader.py
  • Скачать в 12 потоков в указанную папку:
python downloader.py -t 12 -p "D:\Видео\Новинки"
  • Скачать с включенной постобработкой ffmpeg:
python downloader.py --use-fixup

Исходный код

Показать/скрыть исходный код
import subprocess
import os
import re
import argparse

# --- НАСТРОЙКИ ---
LINKS_FILE = "links.txt"  # Имя файла со ссылками
# -----------------

def get_best_format(url: str) -> str | None:
    """
    Получает список форматов, находит последний ID типа 'hls-<число>' и возвращает его.
    """
    print(f"\n🔎 Получаю список форматов для видео: {url}")
    try:
        command = ["yt-dlp", "-F", url]
        result = subprocess.run(command, capture_output=True, text=True, check=True, encoding='utf-8')
        
        lines = result.stdout.split('\n')
        hls_format_ids = []
        format_regex = re.compile(r"^(?P<id>hls-\d+)\s")

        for line in lines:
            match = format_regex.match(line.strip())
            if match:
                hls_format_ids.append(match.group('id'))

        if not hls_format_ids:
            print("❌ Форматы 'hls-<число>' не найдены для этой ссылки.")
            return None

        best_format_id = hls_format_ids[-1]
        print(f"✅ Найдено лучшее качество (последний в списке 'hls-'): {best_format_id}")
        return best_format_id

    except FileNotFoundError:
        print("❌ ОШИБКА: `yt-dlp` не найден. Убедитесь, что он установлен и доступен в PATH.")
        return None
    except subprocess.CalledProcessError as e:
        print(f"❌ ОШИБКА: yt-dlp вернул ошибку при получении форматов. Возможно, ссылка неверна.")
        print(e.stderr)
        return None
    except Exception as e:
        print(f"❌ Произошла непредвиденная ошибка: {e}")
        return None

def download_video(url: str, format_id: str, threads: int, output_path: str | None, use_fixup: bool):
    """
    Запускает скачивание видео с указанным ID формата, количеством потоков и путем сохранения.
    """
    print(f"🚀 Начинаю загрузку с ID '{format_id}' в {threads} потоков...")
    if use_fixup:
        print("🛠️ Постобработка ffmpeg (fixup) ВКЛЮЧЕНА.")
    else:
        print("🛠️ Постобработка ffmpeg (fixup) ОТКЛЮЧЕНА.")
        
    try:
        command = [
            "yt-dlp",
            "-f", format_id,
            "-N", str(threads),
            url
        ]
        
        if not use_fixup:
            command.extend(["--fixup", "never"])
        
        if output_path:
            if not os.path.exists(output_path):
                print(f"📁 Создаю папку для сохранения: {output_path}")
                os.makedirs(output_path)
            command.extend(["-P", output_path])

        subprocess.run(command, check=True)
        print("✅ Загрузка успешно завершена!")
    except FileNotFoundError:
        print("❌ ОШИБКА: `yt-dlp` не найден. Убедитесь, что он установлен.")
    except subprocess.CalledProcessError:
        print("❌ ОШИБКА: Произошла ошибка во время скачивания.")
    except Exception as e:
        print(f"❌ Произошла непредвиденная ошибка при скачивании: {e}")

def process_url(url: str, threads: int, output_path: str | None, use_fixup: bool):
    """
    Полный процесс обработки одной ссылки: получение формата и скачивание.
    """
    url = url.strip()
    if not url:
        return

    best_format_id = get_best_format(url)
    if best_format_id:
        download_video(url, best_format_id, threads, output_path, use_fixup)


def main():
    """
    Главная функция скрипта.
    """
    parser = argparse.ArgumentParser(
        description="""
Скрипт для автоматического скачивания видео с помощью yt-dlp.

Основная логика работы:
1. Автоматически находит HLS-поток ('hls-<число>') с самым высоким качеством.
2. Скачивает видео в несколько потоков для ускорения процесса.
3. Позволяет гибко настраивать параметры через аргументы командной строки.
4. Источник ссылок: файл 'links.txt' или прямой ввод, если файл пуст.
""",
        formatter_class=argparse.RawTextHelpFormatter
    )
    parser.add_argument(
        '-p', '--path', 
        type=str, 
        help='Полный путь к папке для сохранения скачанных файлов.\nПример: "C:\\Users\\User\\Downloads"\nЕсли аргумент не указан, файлы сохраняются в ту же папку,\nгде находится скрипт.'
    )
    parser.add_argument(
        '-t', '--threads', 
        type=int, 
        default=6, 
        help='Количество одновременных потоков для скачивания каждого файла.\nБольше потоков может ускорить загрузку, но увеличивает нагрузку на сеть.\n(По умолчанию: 6)'
    )
    parser.add_argument(
        '--use-fixup',
        action='store_true',
        help='Флаг для принудительного включения постобработки видео с помощью ffmpeg.\nЭто может исправить ошибки в контейнере (например, MPEG-TS в MP4),\nно замедляет процесс. По умолчанию эта функция отключена.'
    )
    args = parser.parse_args()

    urls_to_download = []
    if os.path.exists(LINKS_FILE) and os.path.getsize(LINKS_FILE) > 0:
        print(f"📖 Найден файл '{LINKS_FILE}'. Читаю ссылки из него.")
        with open(LINKS_FILE, 'r', encoding='utf-8') as f:
            urls_to_download = [line for line in f if line.strip()]
    
    if not urls_to_download:
        print(f"📝 Файл '{LINKS_FILE}' пуст или не найден.")
        user_url = input("➡️ Введите ссылку на видео для скачивания: ")
        if user_url:
            urls_to_download.append(user_url)

    if not urls_to_download:
        print(" ссылок для скачивания нет. Завершаю работу.")
        return

    print(f"\nНачинаю обработку {len(urls_to_download)} ссылок...")
    print(f"Параметры: Потоков = {args.threads}, Путь = '{args.path or 'Папка со скриптом'}'")
    print("-" * 30)
    
    for url in urls_to_download:
        process_url(url, threads=args.threads, output_path=args.path, use_fixup=args.use_fixup)
        print("-" * 30)
    
    print("🎉 Все задачи выполнены!")


if __name__ == "__main__":
    main()

Advanced Video Trimmer

Скрипт для быстрой и точной обрезки видеофайлов без перекодирования.

Назначение

Advanced Video Trimmer — это утилита командной строки на Python, предназначенная для "безболезненной" обрезки видео. Она идеально подходит для удаления рекламных заставок, пустых фрагментов в начале или конце файла и других подобных задач.

Ключевые особенности:

  • Обрезка без перекодирования: Используется режим -c copy в FFmpeg, что делает процесс практически мгновенным и сохраняет исходное качество видео.
  • Точность до ключевого кадра: Скрипт автоматически находит ближайшие к заданному времени ключевые кадры (I-frames) для начала и конца обрезки. Это гарантирует отсутствие рассинхронизации аудио и видео.
  • Сохранение метаданных: В обрезанный файл записывается подробная информация об операции: исходная длительность, запрошенное время обрезки и фактическое смещение. Если файл обрабатывается повторно, создается новая запись в истории изменений.
  • Гибкое использование: Поддерживается как интерактивный режим (с вопросами пользователю), так и работа через аргументы командной строки для автоматизации.

Установка

Скрипт требует наличия на компьютере Python и FFmpeg.

  1. Python 3: Убедитесь, что у вас установлен Python. Вы можете скачать его с официального сайта.
  2. FFmpeg: Утилита FFmpeg должна быть установлена и доступна в системной переменной PATH. Скачать её можно с официального сайта FFmpeg.
  3. Сохраните скрипт: Сохраните исходный код скрипта в файл, например, advanced_trim_video.py. Никаких дополнительных библиотек Python устанавливать не нужно.

Использование

Скрипт можно запускать как в интерактивном режиме, так и с указанием всех параметров через аргументы командной строки.

Интерактивный режим

Это самый простой способ. Просто запустите скрипт без аргументов. Он последовательно задаст вопросы:

py advanced_trim_video.py

Скрипт спросит, сколько секунд отрезать от начала и от конца, после чего обработает все видеофайлы в текущем каталоге и сохранит результат в подпапку out.

Аргументы командной строки

Для автоматизации и более сложного использования можно передавать параметры при запуске.

py advanced_trim_video.py -s <СТАРТ> -e <КОНЕЦ> -i <ВХОДНОЙ_КАТАЛОГ> -o <ВЫХОДНОЙ_КАТАЛОГ>
  • -s, --start: Сколько секунд отрезать от начала видео.
  • -e, --end: Сколько секунд отрезать от конца видео.
  • -i, --input: Путь к каталогу с исходными файлами (по умолчанию: текущий каталог).
  • -o, --output: Путь для сохранения результатов (по умолчанию: подкаталог out).

Примеры

  • Обрезать 15.5 секунд в начале и 5 в конце у видео в текущей папке:*
py advanced_trim_video.py -s 15.5 -e 5
  • Взять видео из "D:\Movies", обрезать 10 секунд в начале и сохранить в "F:\Trimmed Videos":*
py advanced_trim_video.py -s 10 -i "D:\Movies" -o "F:\Trimmed Videos"

Просмотр метаданных

Чтобы проверить, что информация об обрезке записалась в файл, используйте команду ffprobe:

ffprobe -v error -show_format "путь/к/выходному/файлу.mp4"

В секции [TAG] вы увидите всю историю изменений.

Исходный код

Показать/скрыть исходный код
import os
import subprocess
import json
import shlex
import time
import argparse

def get_metadata(filepath):
    """Считывает существующие метаданные из файла."""
    try:
        command = [
            "ffprobe", "-v", "error", "-print_format", "json",
            "-show_format", filepath
        ]
        result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, encoding='utf-8')
        data = json.loads(result.stdout)
        return data.get('format', {}).get('tags', {})
    except (subprocess.CalledProcessError, json.JSONDecodeError, FileNotFoundError):
        return {}

def get_video_info_final(filepath):
    """Мгновенно получает длительность и ключевые кадры, читая индекс пакетов."""
    try:
        probe_command = ["ffprobe", "-v", "error", "-print_format", "json", "-show_format", filepath]
        result = subprocess.run(probe_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, encoding='utf-8')
        data = json.loads(result.stdout)
        duration = float(data.get('format', {}).get('duration', 0.0))
        if duration == 0.0: return None, None
    except Exception:
        return None, None

    try:
        keyframes_command = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "packet=pts_time,flags", "-of", "csv=p=0", filepath]
        result = subprocess.run(keyframes_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True, text=True, encoding='utf-8')
        keyframes = [float(parts[0]) for line in result.stdout.strip().split('\n') if (parts := line.split(',')) and len(parts) >= 2 and parts[1].strip().startswith('K')]
        return duration, keyframes
    except Exception:
        return None, None

def find_keyframe_before(keyframes, target_time):
    """Находит ближайший ключевой кадр до указанного времени."""
    if not keyframes: return 0.0
    best_keyframe = 0.0
    for kf_time in keyframes:
        if kf_time <= target_time:
            best_keyframe = kf_time
        else:
            break
    return best_keyframe

def main():
    """Основная функция для запуска скрипта."""
    parser = argparse.ArgumentParser(
        description="""Скрипт для быстрой и точной обрезки видеофайлов без перекодирования.
Он находит ближайшие ключевые кадры для начала и конца обрезки,
чтобы избежать рассинхронизации аудио и видео.
Также записывает подробные метаданные об операции в выходной файл.""",
        formatter_class=argparse.RawTextHelpFormatter
    )
    parser.add_argument('-s', '--start', type=float, help='Сколько секунд отрезать от НАЧАЛА видео.')
    parser.add_argument('-e', '--end', type=float, help='Сколько секунд отрезать от КОНЦА видео.')
    parser.add_argument('-i', '--input', type=str, help='Путь к каталогу с исходными видеофайлами.\nПо умолчанию: текущий каталог.')
    parser.add_argument('-o', '--output', type=str, help='Путь к каталогу для сохранения результатов.\nПо умолчанию: подкаталог "out" в исходном каталоге.')
    
    args = parser.parse_args()

    # --- Интерактивный ввод, если аргументы не указаны ---
    start_trim = args.start
    if start_trim is None:
        try:
            raw_input = input("▶️  Сколько секунд отрезать от НАЧАЛА? (по умолчанию: 0): ")
            start_trim = float(raw_input) if raw_input else 0.0
        except ValueError:
            print("❗ Некорректный ввод. Используется значение 0.")
            start_trim = 0.0

    end_trim = args.end
    if end_trim is None:
        try:
            raw_input = input("◀️  Сколько секунд отрезать от КОНЦА? (по умолчанию: 0): ")
            end_trim = float(raw_input) if raw_input else 0.0
        except ValueError:
            print("❗ Некорректный ввод. Используется значение 0.")
            end_trim = 0.0

    # --- Определение путей ---
    source_dir = args.input or os.getcwd()
    if not os.path.isdir(source_dir):
        print(f"❌ Ошибка: Указанный входной каталог не существует: {source_dir}")
        return
        
    output_dir = args.output or os.path.join(source_dir, "out")
    os.makedirs(output_dir, exist_ok=True)
    
    print(f"\n▶️  Начинаю работу в каталоге: {source_dir}")
    print(f"✂️  Настройки: отрезать ~{start_trim} сек. в начале и ~{end_trim} сек. в конце.")
    print(f"📂 Результаты будут сохранены в: {output_dir}\n")

    video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.flv', '.webm', '.wmv', '.mpeg', '.mpg')
    files_in_dir = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))]

    if not any(f.lower().endswith(video_extensions) for f in files_in_dir):
        print("❌ В текущем каталоге не найдено видеофайлов для обработки.")
        return

    for filename in files_in_dir:
        if filename.lower().endswith(video_extensions):
            source_path = os.path.join(source_dir, filename)
            output_path = os.path.join(output_dir, filename)

            print(f"--- 🎬 Обработка файла: {filename} ---")
            
            duration, keyframes = get_video_info_final(source_path)
            
            if duration is None or not keyframes:
                print(f"🔴 Не удалось проанализировать файл или найти ключевые кадры. Пропускаю.\n")
                continue
            
            print(f"🕒 Исходная длительность: {duration:.3f} секунд.")

            actual_start_time = find_keyframe_before(keyframes, start_trim)
            desired_end_time = duration - end_trim
            actual_end_time = find_keyframe_before(keyframes, desired_end_time)
            
            if actual_start_time >= actual_end_time:
                print(f"🟡 Внимание: Видео слишком короткое для обрезки. Пропускаю.\n")
                continue
            
            new_duration = actual_end_time - actual_start_time
            print(f"   ⏱️ Новая длительность: {new_duration:.3f} сек.")
            print(f"   Желаемый старт: {start_trim:.3f} сек. -> ✅ Реальный старт (keyframe): {actual_start_time:.3f} сек.")
            print(f"   Желаемый конец: {desired_end_time:.3f} сек. -> ✅ Реальный конец (keyframe): {actual_end_time:.3f} сек.")

            # --- Формирование метаданных ---
            existing_meta = get_metadata(source_path)
            history_index = 1
            while f"trim_history_{history_index}_source_duration" in existing_meta:
                history_index += 1

            metadata_args = [
                "-metadata", f"comment=Trimmed with advanced_trim_video.py",
                "-metadata", f"trim_history_{history_index}_source_duration={duration:.6f}",
                "-metadata", f"trim_history_{history_index}_start_requested={start_trim:.3f}",
                "-metadata", f"trim_history_{history_index}_start_actual={actual_start_time:.6f}",
                "-metadata", f"trim_history_{history_index}_end_requested={end_trim:.3f}",
                "-metadata", f"trim_history_{history_index}_end_actual={(duration - actual_end_time):.6f}",
            ]

            command = ["ffmpeg", "-ss", str(actual_start_time), "-i", source_path, "-t", str(new_duration), "-fflags", "+genpts", "-c", "copy"]
            
            if output_path.lower().endswith('.mp4'):
                command.extend(["-movflags", "use_metadata_tags"])
            
            command.extend(metadata_args)
            command.extend(["-y", output_path])
            
            print(f"🚀 Выполняю команду обрезки...")
            try:
                subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                print(f"✅ Успешно! Файл сохранен: {output_path}\n")
            except subprocess.CalledProcessError as e:
                print(f"❗ Произошла ошибка при обработке файла: {e}\n")

    print("🏁 === Обработка всех видеофайлов завершена! ===")

if __name__ == "__main__":
    main()

Утилита для извлечения аудио (extract_audio)

Простая, но мощная утилита для массового извлечения аудиодорожек из видеофайлов в многопоточном режиме.

Назначение

Скрипт предназначен для автоматизации процесса извлечения звука из видео. Он решает следующие задачи:

  • Сканирует указанный каталог на наличие видеофайлов популярных форматов (MKV, MP4, AVI и др.).
  • Извлекает аудиодорожку в отдельный файл.
  • Конвертирует аудио в стандартизированный формат: WAV (PCM 16-бит, 44.1 кГц, стерео).
  • Ускоряет процесс за счет параллельной обработки нескольких файлов одновременно (многопоточность).

Это идеальный инструмент для подготовки аудиоматериалов для дальнейшего анализа, монтажа или архивации.

Установка

Для работы скрипта требуется две вещи:

  1. Python 3.6+: Обычно уже установлен в большинстве современных систем.
  2. FFmpeg: Мощная программа для обработки медиафайлов.
    • Скачайте FFmpeg с официального сайта: ffmpeg.org
    • Важно! Распакуйте архив и добавьте путь к папке `bin` (где находится `ffmpeg.exe`) в системную переменную `PATH`. Это необходимо, чтобы скрипт мог вызывать FFmpeg из командной строки.

Никаких дополнительных Python-библиотек устанавливать не нужно.

Использование

Скрипт запускается из командной строки (терминала).

Параметры запуска

Вы можете посмотреть все доступные команды, выполнив:

python extract_audio_v3.py --help

Основной синтаксис:

python extract_audio_v3.py [-i INPUT_DIR] [-o OUTPUT_DIR] [-t THREADS]
  • -i, --input: Путь к каталогу с вашими видео. Если не указать, скрипт будет искать видео в той же папке, где он сам находится.
  • -o, --output: Путь к каталогу, куда будут сохраняться итоговые `.wav` файлы. Если не указать, будет создана подпапка `audio_out` внутри каталога с видео.
  • -t, --threads: Количество одновременных потоков (файлов для обработки). По умолчанию: 4. Увеличивайте это значение для ускорения работы на мощных процессорах.

Примеры

1. Самый простой запуск
Обработать все видео в текущей папке, используя 4 потока. Результаты будут сохранены в подпапку `audio_out`.
python extract_audio_v3.py
2. Указать папку с видео и увеличить количество потоков до 8
Скрипт обработает видео из `/path/to/my/videos` и сохранит аудио в `/path/to/my/videos/audio_out`.
python extract_audio_v3.py --input "/path/to/my/videos" --threads 8
3. Полный контроль
указать исходную и конечную папки
Взять видео из одного места, а результаты положить в совершенно другое.
python extract_audio_v3.py -i "D:\Movies" -o "D:\Extracted_Audio"

Исходный код

► Нажмите, чтобы показать/скрыть исходный код (extract_audio_v3.py)
import os
import subprocess
import shlex
import argparse
import threading
from queue import Queue

# --- НАСТРОЙКИ ПО УМОЛЧАНИЮ ---
DEFAULT_OUTPUT_SUBDIR = "audio_out"
DEFAULT_THREADS = 4
# --- КОНЕЦ НАСТРОЕК ---

def create_argument_parser():
    """Создает и настраивает парсер аргументов командной строки."""
    parser = argparse.ArgumentParser(
        description="""Скрипт для многопоточного извлечения аудиодорожек из видеофайлов.
                       Он находит все видео в указанном каталоге, извлекает из них звук
                       и сохраняет в формате WAV (стерео, 44.1 кГц, 16 бит).""",
        epilog="""Примеры использования:
  1. Обработать видео в 4 потока и сохранить в 'audio_out':
     python extract_audio_v3.py

  2. Указать папку с видео и использовать 8 потоков:
     python extract_audio_v3.py -i "/path/to/videos" -t 8

  3. Указать папку с видео и папку для сохранения результатов:
     python extract_audio_v3.py -i "/path/to/videos" -o "/path/to/audio_files"
""",
        formatter_class=argparse.RawTextHelpFormatter
    )

    parser.add_argument(
        '-i', '--input',
        type=str,
        default='.',
        help='Путь к каталогу с видеофайлами.\nПо умолчанию: текущий каталог.'
    )

    parser.add_argument(
        '-o', '--output',
        type=str,
        default=None,
        help='Путь для сохранения извлеченных аудиофайлов.\nПо умолчанию: подкаталог "audio_out" в исходном каталоге.'
    )
    
    parser.add_argument(
        '-t', '--threads',
        type=int,
        default=DEFAULT_THREADS,
        help=f'Количество одновременных потоков для обработки.\nПо умолчанию: {DEFAULT_THREADS}.'
    )

    return parser

def worker(q, output_dir):
    """
    Функция-обработчик для потоков. Берет файл из очереди и извлекает аудио.
    """
    while not q.empty():
        try:
            # Получаем задачу из очереди
            filename, source_path = q.get()
            
            base_filename = os.path.splitext(filename)[0]
            output_path = os.path.join(output_dir, f"{base_filename}.wav")

            print(f"--- [Поток {threading.get_ident()}] 🎵 Обработка файла: {filename} ---")

            command = [
                "ffmpeg",
                "-i", source_path,
                "-vn",
                "-acodec", "pcm_s16le",
                "-ac", "2",
                "-ar", "44100",
                "-af", "asetpts=PTS-STARTPTS",
                "-y",
                output_path
            ]
            
            # Используем PIPE, чтобы скрыть подробный вывод ffmpeg
            result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8')
            print(f"✅ [Поток {threading.get_ident()}] Успешно! Файл сохранен: {output_path}")

        except subprocess.CalledProcessError as e:
            error_message = e.stderr.strip()
            print(f"❗ [Поток {threading.get_ident()}] Ошибка FFmpeg при обработке {filename}: {error_message}")
        except Exception as e:
            print(f"❗ [Поток {threading.get_ident()}] Неизвестная ошибка при обработке {filename}: {e}")
        finally:
            # Сообщаем очереди, что задача завершена
            q.task_done()


def extract_audio_from_videos(source_dir, output_dir, num_threads):
    """
    Находит видео, создает очередь и запускает потоки для извлечения аудио.
    """
    source_dir = os.path.abspath(source_dir)

    if output_dir is None:
        output_dir = os.path.join(source_dir, DEFAULT_OUTPUT_SUBDIR)
    
    os.makedirs(output_dir, exist_ok=True)

    print(f"▶️  Начинаю работу в каталоге: {source_dir}")
    print(f"📂 Результаты будут сохранены в: {output_dir}")
    print(f"⚙️  Количество потоков: {num_threads}\n")

    video_extensions = ('.mp4', '.mkv', '.avi', '.mov', '.flv', '.webm', '.wmv')
    
    try:
        files_in_dir = [f for f in os.listdir(source_dir) if os.path.isfile(os.path.join(source_dir, f))]
    except FileNotFoundError:
        print(f"❌ Ошибка: Указанный каталог не найден: {source_dir}")
        return

    video_files_found = [f for f in files_in_dir if f.lower().endswith(video_extensions)]

    if not video_files_found:
        print("❌ В указанном каталоге не найдено видеофайлов для обработки.")
        return
        
    # Создаем очередь и наполняем ее файлами
    q = Queue()
    for filename in video_files_found:
        source_path = os.path.join(source_dir, filename)
        q.put((filename, source_path))

    # Создаем и запускаем потоки
    threads = []
    for _ in range(num_threads):
        thread = threading.Thread(target=worker, args=(q, output_dir), daemon=True)
        thread.start()
        threads.append(thread)
        
    # Ждем, пока очередь не будет полностью обработана
    q.join()

    print("\n🏁 === Извлечение аудио из всех файлов завершено! ===")

if __name__ == "__main__":
    try:
        parser = create_argument_parser()
        args = parser.parse_args()
        extract_audio_from_videos(args.input, args.output, args.threads)
    except FileNotFoundError:
        print("\n❌ Критическая ошибка: FFmpeg не найден. Убедитесь, что он установлен и доступен в системной переменной PATH.")
    except KeyboardInterrupt:
        print("\n🛑 Операция прервана пользователем.")

Скрипт для пакетного разделения аудио (process_audio.py)

Скрипт для автоматического и параллельного разделения аудиофайлов на вокальные и инструментальные дорожки с использованием библиотеки audio-separator.

Назначение

Этот инструмент предназначен для массовой обработки аудиофайлов из указанной папки. Он выполняет следующие задачи:

  • Сканирует входную директорию на наличие аудиофайлов.
  • Разделяет каждый файл на две отдельные дорожки: вокал и инструментал.
  • Сохраняет результаты в две разные, заранее указанные папки.
  • Использует многопоточность для значительного ускорения процесса обработки большого количества файлов.
  • Автоматически проверяет наличие GPU (NVIDIA CUDA) и соответствующих библиотек (PyTorch, ONNX Runtime) для максимальной производительности.

Установка

Перед использованием скрипта необходимо подготовить окружение.

  1. Установить Python: Убедитесь, что у вас установлен Python 3.8 или новее.
  2. Установить библиотеки: Откройте терминал или командную строку и выполните следующую команду для установки всех необходимых зависимостей:
    • Для использования ускорения на GPU (рекомендуется):
  3. pip install torch torchvision --index-url https://download.pytorch.org/whl/cu128
    pip install onnxruntime-gpu
    pip install audio-separator tqdm
    #
    
    • Для использования только CPU:
  4. pip install torch torchvision torchaudio
    pip install onnxruntime
    pip install audio-separator tqdm
    #
    
  5. (Опционально) Установить NVIDIA CUDA Toolkit: Для работы GPU-версии требуется наличие драйверов NVIDIA и CUDA Toolkit на вашей системе.

Использование

Скрипт запускается из командной строки с указанием обязательных путей к папкам.

Синтаксис

python process_audio.py --input_dir <путь_вход> --vocals_dir <путь_вокал> --instrumentals_dir <путь_инструментал> [опции]

Аргументы

--input_dir (обязательный)
Путь к каталогу, содержащему исходные аудиофайлы для обработки.
--vocals_dir (обязательный)
Путь к каталогу, куда будут сохранены извлеченные вокальные дорожки (в формате WAV).
--instrumentals_dir (обязательный)
Путь к каталогу для сохранения инструментальных дорожек (в формате WAV).
-m, --model (необязательный)
Имя ONNX-модели для разделения. По умолчанию: UVR-MDX-NET-Inst_HQ_5.onnx.
-w, --workers (необязательный)
Количество параллельных потоков для обработки. По умолчанию: 4.

Пример

Следующая команда обработает все аудиофайлы из папки D:\Music\Source, сохранит вокал в D:\Music\Output\Vocals, а инструментал — в D:\Music\Output\Instrumentals, используя 8 потоков и альтернативную модель.

python process_audio.py --input_dir "D:\Music\Source" --vocals_dir "D:\Music\Output\Vocals" --instrumentals_dir "D:\Music\Output\Instrumentals" -w 8 -m "UVR-MDX-NET-Inst_HQ_3.onnx"

Исходный код

Исходный код скрипта process_audio.py
import os
import sys
import argparse
import subprocess
import logging
from pathlib import Path
import concurrent.futures
from tqdm import tqdm

# Добавляем импорты для проверки
try:
    import torch
    import onnxruntime as ort
except ImportError as e:
    print(f"Ошибка: Необходимая библиотека не найдена - {e.name}. Пожалуйста, установите все зависимости.")
    sys.exit(1)

# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def check_environment():
    """
    Проверяет, доступны ли GPU для PyTorch и ONNX Runtime, и выводит статус.
    """
    print("--- Проверка окружения для ускорения на GPU ---")
    
    # 1. Проверка PyTorch
    torch_version = torch.__version__
    cuda_available = torch.cuda.is_available()
    print(f"PyTorch версия: {torch_version}")
    if cuda_available:
        print("✅ PyTorch: CUDA (GPU) доступен.")
    else:
        print("❌ PyTorch: CUDA (GPU) НЕ доступен. Будет использоваться CPU.")

    # 2. Проверка ONNX Runtime
    providers = ort.get_available_providers()
    onnx_gpu_available = 'CUDAExecutionProvider' in providers
    print(f"ONNX Runtime провайдеры: {providers}")
    if onnx_gpu_available:
        print("✅ ONNX Runtime: CUDAExecutionProvider (GPU) доступен.")
    else:
        print("❌ ONNX Runtime: CUDAExecutionProvider (GPU) НЕ найден. Будет использоваться CPU.")
    
    # 3. Итог
    if cuda_available and onnx_gpu_available:
        print("\n👍 Окружение настроено правильно. Ускорение на GPU будет задействовано.")
        print("-" * 45)
        return True
    else:
        print("\n⚠️ Внимание: Не удалось настроить ускорение на GPU. Программа продолжит работу на CPU.")
        print("   Убедитесь, что вы установили GPU-версию PyTorch и onnxruntime-gpu.")
        print("-" * 45)
        return False

def process_file(input_file, vocals_output_dir, instrumentals_output_dir, model_name):
    try:
        if sys.platform == "win32":
            separator_executable = Path(sys.executable).parent / "Scripts" / "audio-separator.exe"
        else:
            separator_executable = Path(sys.executable).parent / "bin" / "audio-separator"

        if not separator_executable.exists():
            logging.warning(f"Не удалось найти {separator_executable}, используется глобальный вызов 'audio-separator'.")
            separator_executable = "audio-separator"

        logging.info(f"[Thread] Начало обработки файла: {input_file.name}")

        command_vocals = [
            str(separator_executable), str(input_file), f"-m={model_name}",
            f"--output_dir={vocals_output_dir}", "--single_stem=vocals", "--output_format=WAV"
        ]
        subprocess.run(command_vocals, check=True, capture_output=True, text=True, encoding='utf-8')

        command_instrumentals = [
            str(separator_executable), str(input_file), f"-m={model_name}",
            f"--output_dir={instrumentals_output_dir}", "--single_stem=instrumental", "--output_format=WAV"
        ]
        subprocess.run(command_instrumentals, check=True, capture_output=True, text=True, encoding='utf-8')

        logging.info(f"[Thread] Файл '{input_file.name}' успешно разделен.")
        return f"Успешно обработан файл: {input_file.name}"

    except subprocess.CalledProcessError as e:
        error_message = e.stderr.strip()
        logging.error(f"Ошибка при обработке файла '{input_file.name}':\n{error_message}")
        raise e

def main():
    check_environment()
    args = parser.parse_args()
    run_separation(args.input_dir, args.vocals_dir, args.instrumentals_dir, args.model, args.workers)

def run_separation(input_dir, vocals_dir, instrumentals_dir, model_name, workers):
    logging.info(f"Начало процесса разделения. Максимальное количество параллельных процессов: {workers}.")
    
    input_path = Path(input_dir)
    vocals_output_path = Path(vocals_dir)
    instrumentals_output_path = Path(instrumentals_dir)

    vocals_output_path.mkdir(parents=True, exist_ok=True)
    instrumentals_output_path.mkdir(parents=True, exist_ok=True)
    
    files_to_process = [f for f in input_path.iterdir() if f.is_file()]

    if not files_to_process:
        logging.warning(f"В каталоге '{input_dir}' не найдено файлов для обработки.")
        return

    logging.info(f"Найдено {len(files_to_process)} файлов для обработки.")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        future_to_file = {
            executor.submit(process_file, audio_file, vocals_output_path, instrumentals_output_path, model_name): audio_file.name 
            for audio_file in files_to_process
        }
        
        for future in tqdm(concurrent.futures.as_completed(future_to_file), total=len(files_to_process), desc="Обработка аудиофайлов"):
            file_name = future_to_file[future]
            try:
                result = future.result()
            except Exception as exc:
                logging.error(f"Задача для '{file_name}' завершилась с ошибкой: {exc}")

    logging.info("Процесс разделения полностью завершен.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="""
Скрипт для пакетного разделения аудиофайлов на вокальные и инструментальные дорожки.

Этот инструмент использует библиотеку 'audio-separator' для обработки всех файлов
в указанной входной директории. Разделение происходит параллельно для ускорения
процесса. Перед запуском основной логики, скрипт проверяет наличие GPU и
соответствующих библиотек (PyTorch, ONNX Runtime) для возможного ускорения.
""",
        epilog="""
Пример использования:
  python %(prog)s \\
    --input_dir "/path/to/my/music" \\
    --vocals_dir "/path/to/output/vocals" \\
    --instrumentals_dir "/path/to/output/instrumentals" \\
    -m "UVR-MDX-NET-Inst_HQ_3.onnx" \\
    -w 8

Примечание: Для максимальной производительности убедитесь, что у вас установлены
драйверы NVIDIA, CUDA Toolkit, а также 'torch' и 'onnxruntime-gpu' в вашем
Python окружении.
""",
        formatter_class=argparse.RawTextHelpFormatter
    )
    
    parser.add_argument(
        "--input_dir", 
        type=str, 
        required=True, 
        help="""Путь к каталогу, содержащему исходные аудиофайлы для обработки.
Скрипт обработает все файлы (например, .wav, .mp3, .flac) в этой папке."""
    )
    
    parser.add_argument(
        "--vocals_dir", 
        type=str, 
        required=True, 
        help="""Путь к каталогу, куда будут сохранены извлеченные вокальные дорожки.
Если каталог не существует, он будет создан автоматически.
Результат сохраняется в формате WAV."""
    )
    
    parser.add_argument(
        "--instrumentals_dir", 
        type=str, 
        required=True, 
        help="""Путь к каталогу, куда будут сохранены извлеченные инструментальные дорожки.
Если каталог не существует, он будет создан автоматически.
Результат сохраняется в формате WAV."""
    )
    
    parser.add_argument(
        "-m", "--model", 
        type=str, 
        default="UVR-MDX-NET-Inst_HQ_5.onnx", 
        help="""Имя файла модели ONNX, используемой для разделения.
Модель должна быть доступна для библиотеки 'audio-separator'.
Вы можете скачать другие модели и указать их имя здесь.
(По умолчанию: %(default)s)"""
    )
    
    parser.add_argument(
        "-w", "--workers", 
        type=int, 
        default=4, 
        help="""Количество параллельных потоков для одновременной обработки файлов.
Оптимальное значение зависит от мощности вашего CPU и наличия GPU.
Увеличьте это число, если у вас многоядерный процессор.
(По умолчанию: %(default)s)"""
    )
    
    main()