Skip to content

第49章 多媒体处理

学习目标

完成本章学习后,你将能够:

  1. 处理音频文件:音频读取、格式转换、音频编辑
  2. 处理视频文件:视频读取、格式转换、视频编辑
  3. 实现音频分析:频谱分析、特征提取、音频识别
  4. 实现视频分析:帧提取、运动检测、视频压缩
  5. 处理图像序列:帧处理、动画生成、GIF制作
  6. 实现流媒体:直播推流、流媒体服务器、实时处理
  7. 进行音频合成:音效生成、语音合成、音乐生成
  8. 实现视频特效:滤镜效果、转场动画、字幕添加

49.1 音频处理

49.1.1 音频基础

python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any, Tuple
from enum import Enum
import wave
import struct
import math


class AudioFormat(Enum):
    WAV = "wav"
    MP3 = "mp3"
    FLAC = "flac"
    OGG = "ogg"


@dataclass
class AudioMetadata:
    sample_rate: int = 44100
    channels: int = 2
    sample_width: int = 2
    duration: float = 0.0
    format: AudioFormat = AudioFormat.WAV
    bitrate: int = 128000


@dataclass
class AudioFrame:
    samples: List[float]
    timestamp: float
    channel: int = 0


class AudioBuffer:
    def __init__(
        self,
        samples: List[float] = None,
        sample_rate: int = 44100,
        channels: int = 2
    ):
        self.samples = samples or []
        self.sample_rate = sample_rate
        self.channels = channels

    @property
    def duration(self) -> float:
        return len(self.samples) / self.sample_rate

    @property
    def num_samples(self) -> int:
        return len(self.samples)

    def get_channel(self, channel: int) -> List[float]:
        if self.channels == 1:
            return self.samples

        return [
            self.samples[i]
            for i in range(channel, len(self.samples), self.channels)
        ]

    def to_mono(self) -> "AudioBuffer":
        if self.channels == 1:
            return self

        mono_samples = []
        for i in range(0, len(self.samples), self.channels):
            avg = sum(self.samples[i:i + self.channels]) / self.channels
            mono_samples.append(avg)

        return AudioBuffer(mono_samples, self.sample_rate, 1)

    def normalize(self, target_peak: float = 1.0) -> "AudioBuffer":
        if not self.samples:
            return self

        max_val = max(abs(s) for s in self.samples)
        if max_val == 0:
            return self

        scale = target_peak / max_val
        normalized = [s * scale for s in self.samples]

        return AudioBuffer(normalized, self.sample_rate, self.channels)

    def amplify(self, factor: float) -> "AudioBuffer":
        amplified = [s * factor for s in self.samples]
        return AudioBuffer(amplified, self.sample_rate, self.channels)

    def reverse(self) -> "AudioBuffer":
        return AudioBuffer(self.samples[::-1], self.sample_rate, self.channels)

    def slice(self, start: float, end: float) -> "AudioBuffer":
        start_sample = int(start * self.sample_rate) * self.channels
        end_sample = int(end * self.sample_rate) * self.channels

        return AudioBuffer(
            self.samples[start_sample:end_sample],
            self.sample_rate,
            self.channels
        )

    def concatenate(self, other: "AudioBuffer") -> "AudioBuffer":
        if self.sample_rate != other.sample_rate:
            raise ValueError("Sample rates must match")

        return AudioBuffer(
            self.samples + other.samples,
            self.sample_rate,
            self.channels
        )

    def fade_in(self, duration: float) -> "AudioBuffer":
        fade_samples = int(duration * self.sample_rate) * self.channels
        faded = self.samples.copy()

        for i in range(min(fade_samples, len(faded))):
            faded[i] *= i / fade_samples

        return AudioBuffer(faded, self.sample_rate, self.channels)

    def fade_out(self, duration: float) -> "AudioBuffer":
        fade_samples = int(duration * self.sample_rate) * self.channels
        faded = self.samples.copy()

        for i in range(min(fade_samples, len(faded))):
            idx = len(faded) - fade_samples + i
            if idx >= 0:
                faded[idx] *= i / fade_samples

        return AudioBuffer(faded, self.sample_rate, self.channels)


class AudioFile:
    def __init__(self, filepath: str):
        self.filepath = filepath
        self.metadata = AudioMetadata()
        self._buffer: Optional[AudioBuffer] = None

    def read(self) -> AudioBuffer:
        with wave.open(self.filepath, 'rb') as wf:
            self.metadata.channels = wf.getnchannels()
            self.metadata.sample_width = wf.getsampwidth()
            self.metadata.sample_rate = wf.getframerate()
            num_frames = wf.getnframes()

            raw_data = wf.readframes(num_frames)

            if self.metadata.sample_width == 2:
                fmt = f'<{num_frames * self.metadata.channels}h'
                samples = list(struct.unpack(fmt, raw_data))
                samples = [s / 32768.0 for s in samples]
            else:
                samples = []

            self._buffer = AudioBuffer(
                samples,
                self.metadata.sample_rate,
                self.metadata.channels
            )
            self.metadata.duration = self._buffer.duration

        return self._buffer

    def write(self, buffer: AudioBuffer, output_path: str) -> None:
        with wave.open(output_path, 'wb') as wf:
            wf.setnchannels(buffer.channels)
            wf.setsampwidth(2)
            wf.setframerate(buffer.sample_rate)

            samples = [int(s * 32767) for s in buffer.samples]
            samples = [max(-32768, min(32767, s)) for s in samples]

            raw_data = struct.pack(f'<{len(samples)}h', *samples)
            wf.writeframes(raw_data)


class AudioEffects:
    @staticmethod
    def apply_echo(
        buffer: AudioBuffer,
        delay: float = 0.3,
        decay: float = 0.5
    ) -> AudioBuffer:
        delay_samples = int(delay * buffer.sample_rate) * buffer.channels
        output = buffer.samples.copy()

        for i in range(delay_samples, len(output)):
            output[i] += output[i - delay_samples] * decay

        return AudioBuffer(output, buffer.sample_rate, buffer.channels)

    @staticmethod
    def apply_reverb(
        buffer: AudioBuffer,
        room_size: float = 0.5,
        damping: float = 0.5
    ) -> AudioBuffer:
        output = buffer.samples.copy()

        delays = [0.03, 0.05, 0.07, 0.11]
        for delay in delays:
            delay_samples = int(delay * buffer.sample_rate) * buffer.channels
            for i in range(delay_samples, len(output)):
                output[i] += buffer.samples[i - delay_samples] * room_size * (1 - damping)

        return AudioBuffer(output, buffer.sample_rate, buffer.channels)

    @staticmethod
    def apply_low_pass(
        buffer: AudioBuffer,
        cutoff: float = 1000.0
    ) -> AudioBuffer:
        rc = 1.0 / (2 * math.pi * cutoff)
        dt = 1.0 / buffer.sample_rate
        alpha = dt / (rc + dt)

        output = [buffer.samples[0]]
        for i in range(1, len(buffer.samples)):
            output.append(
                output[-1] + alpha * (buffer.samples[i] - output[-1])
            )

        return AudioBuffer(output, buffer.sample_rate, buffer.channels)

    @staticmethod
    def apply_high_pass(
        buffer: AudioBuffer,
        cutoff: float = 1000.0
    ) -> AudioBuffer:
        rc = 1.0 / (2 * math.pi * cutoff)
        dt = 1.0 / buffer.sample_rate
        alpha = rc / (rc + dt)

        output = [buffer.samples[0]]
        for i in range(1, len(buffer.samples)):
            output.append(
                alpha * (output[-1] + buffer.samples[i] - buffer.samples[i - 1])
            )

        return AudioBuffer(output, buffer.sample_rate, buffer.channels)

    @staticmethod
    def change_speed(
        buffer: AudioBuffer,
        speed: float
    ) -> AudioBuffer:
        new_length = int(len(buffer.samples) / speed)
        output = []

        for i in range(new_length):
            src_idx = i * speed
            idx1 = int(src_idx)
            idx2 = min(idx1 + 1, len(buffer.samples) - 1)
            frac = src_idx - idx1

            sample = buffer.samples[idx1] * (1 - frac) + buffer.samples[idx2] * frac
            output.append(sample)

        return AudioBuffer(output, buffer.sample_rate, buffer.channels)

    @staticmethod
    def change_pitch(
        buffer: AudioBuffer,
        semitones: int
    ) -> AudioBuffer:
        factor = 2 ** (semitones / 12.0)
        return AudioEffects.change_speed(buffer, factor)

49.1.2 音频分析

python
from typing import List, Tuple
import math


class AudioAnalyzer:
    def __init__(self, buffer: AudioBuffer):
        self.buffer = buffer.to_mono()

    def get_rms(self) -> float:
        if not self.buffer.samples:
            return 0.0

        sum_squares = sum(s ** 2 for s in self.buffer.samples)
        return math.sqrt(sum_squares / len(self.buffer.samples))

    def get_peak(self) -> float:
        if not self.buffer.samples:
            return 0.0
        return max(abs(s) for s in self.buffer.samples)

    def get_db(self) -> float:
        rms = self.get_rms()
        if rms == 0:
            return -float('inf')
        return 20 * math.log10(rms)

    def detect_silence(
        self,
        threshold_db: float = -40.0,
        min_duration: float = 0.1
    ) -> List[Tuple[float, float]]:
        threshold = 10 ** (threshold_db / 20.0)
        min_samples = int(min_duration * self.buffer.sample_rate)

        silent_regions = []
        in_silence = False
        silence_start = 0

        for i, sample in enumerate(self.buffer.samples):
            if abs(sample) < threshold:
                if not in_silence:
                    in_silence = True
                    silence_start = i
            else:
                if in_silence:
                    silence_duration = i - silence_start
                    if silence_duration >= min_samples:
                        start_time = silence_start / self.buffer.sample_rate
                        end_time = i / self.buffer.sample_rate
                        silent_regions.append((start_time, end_time))
                    in_silence = False

        return silent_regions

    def compute_fft(self, window_size: int = 1024) -> List[Tuple[float, float]]:
        samples = self.buffer.samples[:window_size]

        n = len(samples)
        result = []

        for k in range(n // 2):
            real = 0.0
            imag = 0.0

            for t in range(n):
                angle = 2 * math.pi * k * t / n
                real += samples[t] * math.cos(angle)
                imag -= samples[t] * math.sin(angle)

            magnitude = math.sqrt(real ** 2 + imag ** 2) / n
            frequency = k * self.buffer.sample_rate / n

            result.append((frequency, magnitude))

        return result

    def get_spectral_centroid(self) -> float:
        spectrum = self.compute_fft()

        weighted_sum = 0.0
        magnitude_sum = 0.0

        for freq, mag in spectrum:
            weighted_sum += freq * mag
            magnitude_sum += mag

        if magnitude_sum == 0:
            return 0.0

        return weighted_sum / magnitude_sum

    def get_zero_crossing_rate(self) -> float:
        crossings = 0

        for i in range(1, len(self.buffer.samples)):
            if (self.buffer.samples[i - 1] >= 0 and self.buffer.samples[i] < 0) or \
               (self.buffer.samples[i - 1] < 0 and self.buffer.samples[i] >= 0):
                crossings += 1

        return crossings / len(self.buffer.samples)


class AudioGenerator:
    @staticmethod
    def generate_sine(
        frequency: float,
        duration: float,
        sample_rate: int = 44100,
        amplitude: float = 1.0
    ) -> AudioBuffer:
        num_samples = int(duration * sample_rate)
        samples = []

        for i in range(num_samples):
            t = i / sample_rate
            sample = amplitude * math.sin(2 * math.pi * frequency * t)
            samples.append(sample)

        return AudioBuffer(samples, sample_rate, 1)

    @staticmethod
    def generate_square(
        frequency: float,
        duration: float,
        sample_rate: int = 44100,
        amplitude: float = 1.0
    ) -> AudioBuffer:
        num_samples = int(duration * sample_rate)
        samples = []

        period = sample_rate / frequency

        for i in range(num_samples):
            if (i % int(period)) < (period / 2):
                samples.append(amplitude)
            else:
                samples.append(-amplitude)

        return AudioBuffer(samples, sample_rate, 1)

    @staticmethod
    def generate_sawtooth(
        frequency: float,
        duration: float,
        sample_rate: int = 44100,
        amplitude: float = 1.0
    ) -> AudioBuffer:
        num_samples = int(duration * sample_rate)
        samples = []

        period = sample_rate / frequency

        for i in range(num_samples):
            sample = 2 * amplitude * ((i % period) / period - 0.5)
            samples.append(sample)

        return AudioBuffer(samples, sample_rate, 1)

    @staticmethod
    def generate_noise(
        duration: float,
        sample_rate: int = 44100,
        amplitude: float = 1.0
    ) -> AudioBuffer:
        import random
        num_samples = int(duration * sample_rate)
        samples = [random.uniform(-amplitude, amplitude) for _ in range(num_samples)]

        return AudioBuffer(samples, sample_rate, 1)

    @staticmethod
    def generate_envelope(
        attack: float,
        decay: float,
        sustain: float,
        release: float,
        duration: float,
        sample_rate: int = 44100
    ) -> List[float]:
        num_samples = int(duration * sample_rate)
        envelope = []

        attack_samples = int(attack * sample_rate)
        decay_samples = int(decay * sample_rate)
        release_samples = int(release * sample_rate)

        for i in range(num_samples):
            if i < attack_samples:
                envelope.append(i / attack_samples)
            elif i < attack_samples + decay_samples:
                decay_progress = (i - attack_samples) / decay_samples
                envelope.append(1.0 - (1.0 - sustain) * decay_progress)
            elif i < num_samples - release_samples:
                envelope.append(sustain)
            else:
                release_progress = (i - (num_samples - release_samples)) / release_samples
                envelope.append(sustain * (1 - release_progress))

        return envelope

49.2 视频处理

49.2.1 视频基础

python
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from PIL import Image
import io


@dataclass
class VideoMetadata:
    width: int = 1920
    height: int = 1080
    fps: float = 30.0
    duration: float = 0.0
    codec: str = "h264"
    bitrate: int = 5000000
    frame_count: int = 0


@dataclass
class VideoFrame:
    image: Image.Image
    timestamp: float
    frame_number: int

    def to_bytes(self, format: str = "JPEG") -> bytes:
        buffer = io.BytesIO()
        self.image.save(buffer, format=format)
        return buffer.getvalue()

    @classmethod
    def from_bytes(cls, data: bytes, timestamp: float, frame_number: int) -> "VideoFrame":
        image = Image.open(io.BytesIO(data))
        return cls(image=image, timestamp=timestamp, frame_number=frame_number)


class VideoBuffer:
    def __init__(
        self,
        frames: List[VideoFrame] = None,
        metadata: VideoMetadata = None
    ):
        self.frames = frames or []
        self.metadata = metadata or VideoMetadata()

    @property
    def duration(self) -> float:
        return len(self.frames) / self.metadata.fps if self.metadata.fps > 0 else 0

    def get_frame(self, index: int) -> Optional[VideoFrame]:
        if 0 <= index < len(self.frames):
            return self.frames[index]
        return None

    def get_frame_at_time(self, timestamp: float) -> Optional[VideoFrame]:
        frame_index = int(timestamp * self.metadata.fps)
        return self.get_frame(frame_index)

    def slice(self, start: float, end: float) -> "VideoBuffer":
        start_frame = int(start * self.metadata.fps)
        end_frame = int(end * self.metadata.fps)

        return VideoBuffer(
            frames=self.frames[start_frame:end_frame],
            metadata=self.metadata
        )

    def concatenate(self, other: "VideoBuffer") -> "VideoBuffer":
        if self.metadata.fps != other.metadata.fps:
            raise ValueError("FPS must match")

        return VideoBuffer(
            frames=self.frames + other.frames,
            metadata=self.metadata
        )


class VideoProcessor:
    def __init__(self, video: VideoBuffer):
        self.video = video

    def resize(self, width: int, height: int) -> VideoBuffer:
        resized_frames = []

        for frame in self.video.frames:
            resized_image = frame.image.resize((width, height), Image.LANCZOS)
            resized_frames.append(VideoFrame(
                image=resized_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        metadata = VideoMetadata(
            width=width,
            height=height,
            fps=self.video.metadata.fps,
            duration=self.video.duration
        )

        return VideoBuffer(frames=resized_frames, metadata=metadata)

    def crop(
        self,
        left: int,
        top: int,
        right: int,
        bottom: int
    ) -> VideoBuffer:
        cropped_frames = []

        for frame in self.video.frames:
            cropped_image = frame.image.crop((left, top, right, bottom))
            cropped_frames.append(VideoFrame(
                image=cropped_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        metadata = VideoMetadata(
            width=right - left,
            height=bottom - top,
            fps=self.video.metadata.fps,
            duration=self.video.duration
        )

        return VideoBuffer(frames=cropped_frames, metadata=metadata)

    def rotate(self, angle: float) -> VideoBuffer:
        rotated_frames = []

        for frame in self.video.frames:
            rotated_image = frame.image.rotate(angle, expand=True)
            rotated_frames.append(VideoFrame(
                image=rotated_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=rotated_frames, metadata=self.video.metadata)

    def flip_horizontal(self) -> VideoBuffer:
        flipped_frames = []

        for frame in self.video.frames:
            flipped_image = frame.image.transpose(Image.FLIP_LEFT_RIGHT)
            flipped_frames.append(VideoFrame(
                image=flipped_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=flipped_frames, metadata=self.video.metadata)

    def flip_vertical(self) -> VideoBuffer:
        flipped_frames = []

        for frame in self.video.frames:
            flipped_image = frame.image.transpose(Image.FLIP_TOP_BOTTOM)
            flipped_frames.append(VideoFrame(
                image=flipped_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=flipped_frames, metadata=self.video.metadata)

    def change_speed(self, speed: float) -> VideoBuffer:
        new_fps = self.video.metadata.fps * speed

        frame_indices = [
            int(i / speed)
            for i in range(int(len(self.video.frames) * speed))
            if int(i / speed) < len(self.video.frames)
        ]

        new_frames = [self.video.frames[i] for i in frame_indices]

        metadata = VideoMetadata(
            width=self.video.metadata.width,
            height=self.video.metadata.height,
            fps=new_fps,
            duration=len(new_frames) / new_fps
        )

        return VideoBuffer(frames=new_frames, metadata=metadata)


class VideoEffects:
    @staticmethod
    def apply_grayscale(video: VideoBuffer) -> VideoBuffer:
        grayscale_frames = []

        for frame in video.frames:
            grayscale_image = frame.image.convert("L").convert("RGB")
            grayscale_frames.append(VideoFrame(
                image=grayscale_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=grayscale_frames, metadata=video.metadata)

    @staticmethod
    def apply_sepia(video: VideoBuffer) -> VideoBuffer:
        sepia_frames = []

        for frame in video.frames:
            image = frame.image.convert("RGB")
            pixels = image.load()

            for y in range(image.height):
                for x in range(image.width):
                    r, g, b = pixels[x, y]

                    tr = int(0.393 * r + 0.769 * g + 0.189 * b)
                    tg = int(0.349 * r + 0.686 * g + 0.168 * b)
                    tb = int(0.272 * r + 0.534 * g + 0.131 * b)

                    pixels[x, y] = (
                        min(255, tr),
                        min(255, tg),
                        min(255, tb)
                    )

            sepia_frames.append(VideoFrame(
                image=image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=sepia_frames, metadata=video.metadata)

    @staticmethod
    def adjust_brightness(video: VideoBuffer, factor: float) -> VideoBuffer:
        from PIL import ImageEnhance

        adjusted_frames = []

        for frame in video.frames:
            enhancer = ImageEnhance.Brightness(frame.image)
            adjusted_image = enhancer.enhance(factor)
            adjusted_frames.append(VideoFrame(
                image=adjusted_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=adjusted_frames, metadata=video.metadata)

    @staticmethod
    def adjust_contrast(video: VideoBuffer, factor: float) -> VideoBuffer:
        from PIL import ImageEnhance

        adjusted_frames = []

        for frame in video.frames:
            enhancer = ImageEnhance.Contrast(frame.image)
            adjusted_image = enhancer.enhance(factor)
            adjusted_frames.append(VideoFrame(
                image=adjusted_image,
                timestamp=frame.timestamp,
                frame_number=frame.frame_number
            ))

        return VideoBuffer(frames=adjusted_frames, metadata=video.metadata)


class VideoTransition:
    @staticmethod
    def fade_transition(
        video1: VideoBuffer,
        video2: VideoBuffer,
        duration: float
    ) -> VideoBuffer:
        fps = video1.metadata.fps
        transition_frames = int(duration * fps)

        result_frames = video1.frames[:-transition_frames] if len(video1.frames) > transition_frames else []

        for i in range(transition_frames):
            alpha = i / transition_frames

            frame1 = video1.frames[-(transition_frames - i)] if len(video1.frames) > (transition_frames - i) else video1.frames[-1]
            frame2 = video2.frames[i] if i < len(video2.frames) else video2.frames[-1]

            blended = Image.blend(frame1.image, frame2.image, alpha)
            result_frames.append(VideoFrame(
                image=blended,
                timestamp=len(result_frames) / fps,
                frame_number=len(result_frames)
            ))

        result_frames.extend(video2.frames[transition_frames:])

        return VideoBuffer(frames=result_frames, metadata=video1.metadata)


class SubtitleGenerator:
    def __init__(self, video: VideoBuffer):
        self.video = video

    def add_text(
        self,
        text: str,
        start_time: float,
        end_time: float,
        position: Tuple[int, int] = (0, 0),
        font_size: int = 24,
        color: Tuple[int, int, int] = (255, 255, 255)
    ) -> VideoBuffer:
        from PIL import ImageDraw, ImageFont

        result_frames = []
        start_frame = int(start_time * self.video.metadata.fps)
        end_frame = int(end_time * self.video.metadata.fps)

        for i, frame in enumerate(self.video.frames):
            if start_frame <= i <= end_frame:
                image = frame.image.copy()
                draw = ImageDraw.Draw(image)

                try:
                    font = ImageFont.truetype("arial.ttf", font_size)
                except:
                    font = ImageFont.load_default()

                draw.text(position, text, fill=color, font=font)

                result_frames.append(VideoFrame(
                    image=image,
                    timestamp=frame.timestamp,
                    frame_number=frame.frame_number
                ))
            else:
                result_frames.append(frame)

        return VideoBuffer(frames=result_frames, metadata=self.video.metadata)

49.3 知识图谱

49.3.1 多媒体处理技术栈

┌─────────────────────────────────────────────────────────────────────┐
│                      多媒体处理技术架构                               │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                      应用层 (Application)                     │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 视频编辑  │ │ 音频处理  │ │ 图像处理  │ │ 流媒体   │       │   │
│  │  │ MoviePy │ │ PyAudio │ │ Pillow  │ │ FFmpeg  │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      处理层 (Processing)                      │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 编解码   │ │ 滤镜效果  │ │ 特征提取  │ │ 格式转换  │       │   │
│  │  │ Codec   │ │ Filter  │ │ Feature │ │ Convert │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      分析层 (Analysis)                        │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 频谱分析  │ │ 音频识别  │ │ 视频检测  │ │ 内容分析  │       │   │
│  │  │ FFT     │ │ ASR     │ │ Object  │ │ NLP     │       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                │                                    │
│  ┌─────────────────────────────┴───────────────────────────────┐   │
│  │                      存储层 (Storage)                         │   │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐       │   │
│  │  │ 音频格式  │ │ 视频格式  │ │ 图像格式  │ │ 流协议   │       │   │
│  │  │ MP3/WAV │ │ MP4/MKV │ │ JPEG/PNG│ │ HLS/DASH│       │   │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘       │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘

49.3.2 音视频处理流程

┌─────────────────────────────────────────────────────────────────────┐
│                      音视频处理工作流程                              │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌──────────┐                                                      │
│   │ 输入源   │ ─── 文件 / 流 / 设备                                 │
│   └────┬─────┘                                                      │
│        │                                                            │
│        ▼                                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    解封装 (Demux)                         │     │
│   │  ┌──────────┐ ┌──────────┐                               │     │
│   │  │ 视频流   │ │ 音频流   │                               │     │
│   │  └────┬─────┘ └────┬─────┘                               │     │
│   └────────┼───────────┼─────────────────────────────────────┘     │
│            │           │                                            │
│            ▼           ▼                                            │
│   ┌────────────┐ ┌────────────┐                                    │
│   │ 视频解码   │ │ 音频解码   │                                    │
│   │ H.264/HEVC│ │ AAC/MP3   │                                    │
│   └─────┬──────┘ └─────┬──────┘                                    │
│         │              │                                            │
│         ▼              ▼                                            │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    处理 (Processing)                      │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ 滤镜     │ │ 特效     │ │ 编辑     │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│         │              │                                            │
│         ▼              ▼                                            │
│   ┌────────────┐ ┌────────────┐                                    │
│   │ 视频编码   │ │ 音频编码   │                                    │
│   └─────┬──────┘ └─────┬──────┘                                    │
│         │              │                                            │
│         └──────┬───────┘                                            │
│                ▼                                                    │
│   ┌──────────────────────────────────────────────────────────┐     │
│   │                    封装 (Mux)                             │     │
│   │  ┌──────────┐ ┌──────────┐ ┌──────────┐                 │     │
│   │  │ MP4     │ │ MKV     │ │ WebM    │                 │     │
│   │  └──────────┘ └──────────┘ └──────────┘                 │     │
│   └──────────────────────────────────────────────────────────┘     │
│                │                                                    │
│                ▼                                                    │
│   ┌──────────┐                                                      │
│   │ 输出     │ ─── 文件 / 流 / 播放                                 │
│   └──────────┘                                                      │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

49.4 技术选型指南

49.4.1 音频处理库选型

功能性能学习曲线推荐指数
PyAudio音频I/O★★★★★
librosa音频分析★★★★★
soundfile文件读写★★★★☆
pydub音频处理★★★★☆
pedalboard音频效果★★★★☆

49.4.2 视频处理库选型

功能性能学习曲线推荐指数
MoviePy视频编辑★★★★★
OpenCV视频处理极高★★★★★
FFmpeg编解码极高★★★★★
imageio图像I/O★★★★☆
vidgear视频捕获★★★★☆

49.4.3 多媒体格式选型

类型格式压缩率质量兼容性
音频MP3极高
音频AAC极高
音频FLAC无损极高
视频H.264极高
视频H.265极高
视频VP9

49.5 常见问题与解决方案

49.5.1 音频处理常见问题

python
import numpy as np
from typing import Tuple, List, Optional
from dataclasses import dataclass

@dataclass
class AudioConfig:
    """音频配置"""
    sample_rate: int = 44100
    channels: int = 2
    bit_depth: int = 16
    buffer_size: int = 1024


class AudioProcessor:
    """音频处理器"""
    
    def __init__(self, config: AudioConfig = None):
        self.config = config or AudioConfig()
    
    def normalize(self, audio: np.ndarray, target_db: float = -3.0) -> np.ndarray:
        """音频归一化"""
        current_db = 20 * np.log10(np.max(np.abs(audio)) + 1e-10)
        gain = 10 ** ((target_db - current_db) / 20)
        return audio * gain
    
    def remove_noise(
        self,
        audio: np.ndarray,
        noise_threshold: float = 0.02
    ) -> np.ndarray:
        """噪声消除"""
        noise_mask = np.abs(audio) < noise_threshold
        cleaned = audio.copy()
        cleaned[noise_mask] = 0
        return cleaned
    
    def apply_fade(
        self,
        audio: np.ndarray,
        fade_in: float = 0.1,
        fade_out: float = 0.1
    ) -> np.ndarray:
        """应用淡入淡出"""
        sample_rate = self.config.sample_rate
        fade_in_samples = int(fade_in * sample_rate)
        fade_out_samples = int(fade_out * sample_rate)
        
        result = audio.copy()
        
        if fade_in_samples > 0:
            fade_in_curve = np.linspace(0, 1, fade_in_samples)
            result[:fade_in_samples] *= fade_in_curve
        
        if fade_out_samples > 0:
            fade_out_curve = np.linspace(1, 0, fade_out_samples)
            result[-fade_out_samples:] *= fade_out_curve
        
        return result
    
    def change_speed(
        self,
        audio: np.ndarray,
        speed: float = 1.0
    ) -> np.ndarray:
        """改变播放速度"""
        if speed == 1.0:
            return audio
        
        indices = np.arange(0, len(audio), speed)
        indices = indices[indices < len(audio)].astype(int)
        return audio[indices]
    
    def mix_audio(
        self,
        audio1: np.ndarray,
        audio2: np.ndarray,
        mix_ratio: float = 0.5
    ) -> np.ndarray:
        """混合音频"""
        max_len = max(len(audio1), len(audio2))
        
        padded1 = np.pad(audio1, (0, max_len - len(audio1)))
        padded2 = np.pad(audio2, (0, max_len - len(audio2)))
        
        return padded1 * (1 - mix_ratio) + padded2 * mix_ratio


class AudioAnalyzer:
    """音频分析器"""
    
    def __init__(self, sample_rate: int = 44100):
        self.sample_rate = sample_rate
    
    def get_spectrum(self, audio: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """获取频谱"""
        fft_result = np.fft.fft(audio)
        frequencies = np.fft.fftfreq(len(audio), 1 / self.sample_rate)
        
        positive_freq_mask = frequencies >= 0
        return frequencies[positive_freq_mask], np.abs(fft_result[positive_freq_mask])
    
    def get_mfcc(self, audio: np.ndarray, n_mfcc: int = 13) -> np.ndarray:
        """提取MFCC特征"""
        try:
            import librosa
            return librosa.feature.mfcc(
                y=audio.astype(float),
                sr=self.sample_rate,
                n_mfcc=n_mfcc
            )
        except ImportError:
            return np.zeros((n_mfcc, len(audio) // 512 + 1))
    
    def detect_silence(
        self,
        audio: np.ndarray,
        threshold: float = 0.01,
        min_duration: float = 0.1
    ) -> List[Tuple[float, float]]:
        """检测静音段"""
        is_silent = np.abs(audio) < threshold
        
        silent_regions = []
        start = None
        
        for i, silent in enumerate(is_silent):
            if silent and start is None:
                start = i
            elif not silent and start is not None:
                duration = (i - start) / self.sample_rate
                if duration >= min_duration:
                    silent_regions.append((
                        start / self.sample_rate,
                        i / self.sample_rate
                    ))
                start = None
        
        return silent_regions
    
    def get_volume_envelope(
        self,
        audio: np.ndarray,
        window_size: int = 1024
    ) -> np.ndarray:
        """获取音量包络"""
        abs_audio = np.abs(audio)
        kernel = np.ones(window_size) / window_size
        envelope = np.convolve(abs_audio, kernel, mode='same')
        return envelope

49.5.2 视频处理常见问题

python
import numpy as np
from typing import Tuple, List, Optional, Dict
from dataclasses import dataclass

@dataclass
class VideoConfig:
    """视频配置"""
    width: int = 1920
    height: int = 1080
    fps: int = 30
    codec: str = "h264"


class VideoProcessor:
    """视频处理器"""
    
    def __init__(self, config: VideoConfig = None):
        self.config = config or VideoConfig()
    
    def resize(
        self,
        frame: np.ndarray,
        target_size: Tuple[int, int]
    ) -> np.ndarray:
        """调整帧大小"""
        try:
            import cv2
            return cv2.resize(frame, target_size)
        except ImportError:
            return frame
    
    def crop(
        self,
        frame: np.ndarray,
        x: int,
        y: int,
        width: int,
        height: int
    ) -> np.ndarray:
        """裁剪帧"""
        return frame[y:y+height, x:x+width]
    
    def rotate(self, frame: np.ndarray, angle: float) -> np.ndarray:
        """旋转帧"""
        try:
            import cv2
            h, w = frame.shape[:2]
            center = (w // 2, h // 2)
            matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
            return cv2.warpAffine(frame, matrix, (w, h))
        except ImportError:
            return frame
    
    def apply_filter(
        self,
        frame: np.ndarray,
        filter_type: str
    ) -> np.ndarray:
        """应用滤镜"""
        try:
            import cv2
            
            if filter_type == "grayscale":
                return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            elif filter_type == "blur":
                return cv2.GaussianBlur(frame, (15, 15), 0)
            elif filter_type == "sharpen":
                kernel = np.array([[-1, -1, -1],
                                   [-1,  9, -1],
                                   [-1, -1, -1]])
                return cv2.filter2D(frame, -1, kernel)
            elif filter_type == "edge":
                return cv2.Canny(frame, 100, 200)
            else:
                return frame
        except ImportError:
            return frame
    
    def add_watermark(
        self,
        frame: np.ndarray,
        watermark: np.ndarray,
        position: Tuple[int, int] = (0, 0),
        opacity: float = 0.5
    ) -> np.ndarray:
        """添加水印"""
        x, y = position
        h, w = watermark.shape[:2]
        
        roi = frame[y:y+h, x:x+w]
        
        blended = cv2.addWeighted(
            roi, 1 - opacity,
            watermark, opacity, 0
        )
        
        result = frame.copy()
        result[y:y+h, x:x+w] = blended
        return result


class VideoEditor:
    """视频编辑器"""
    
    def __init__(self):
        self.clips: List[Dict] = []
    
    def add_clip(
        self,
        frames: List[np.ndarray],
        start_time: float,
        duration: float
    ):
        """添加片段"""
        self.clips.append({
            "frames": frames,
            "start_time": start_time,
            "duration": duration
        })
    
    def create_transition(
        self,
        frame1: np.ndarray,
        frame2: np.ndarray,
        transition_type: str = "fade",
        duration_frames: int = 30
    ) -> List[np.ndarray]:
        """创建转场"""
        frames = []
        
        if transition_type == "fade":
            for i in range(duration_frames):
                alpha = i / duration_frames
                blended = cv2.addWeighted(
                    frame1, 1 - alpha,
                    frame2, alpha, 0
                )
                frames.append(blended)
        
        elif transition_type == "wipe":
            for i in range(duration_frames):
                progress = i / duration_frames
                width = frame1.shape[1]
                split = int(width * progress)
                
                result = frame1.copy()
                result[:, split:] = frame2[:, split:]
                frames.append(result)
        
        return frames
    
    def add_text_overlay(
        self,
        frame: np.ndarray,
        text: str,
        position: Tuple[int, int],
        font_scale: float = 1.0,
        color: Tuple[int, int, int] = (255, 255, 255)
    ) -> np.ndarray:
        """添加文字叠加"""
        try:
            import cv2
            result = frame.copy()
            cv2.putText(
                result, text, position,
                cv2.FONT_HERSHEY_SIMPLEX,
                font_scale, color, 2
            )
            return result
        except ImportError:
            return frame


class FrameExtractor:
    """帧提取器"""
    
    def __init__(self, video_path: str):
        self.video_path = video_path
    
    def extract_frames(
        self,
        output_dir: str,
        fps: int = None
    ) -> List[str]:
        """提取帧"""
        try:
            import cv2
            
            cap = cv2.VideoCapture(self.video_path)
            video_fps = cap.get(cv2.CAP_PROP_FPS)
            extract_fps = fps or video_fps
            
            frame_paths = []
            frame_count = 0
            
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                if frame_count % int(video_fps / extract_fps) == 0:
                    frame_path = f"{output_dir}/frame_{frame_count:06d}.jpg"
                    cv2.imwrite(frame_path, frame)
                    frame_paths.append(frame_path)
                
                frame_count += 1
            
            cap.release()
            return frame_paths
        except ImportError:
            return []

49.5.3 性能优化策略

python
from typing import List, Callable, Any
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing

class MediaOptimizer:
    """媒体处理优化器"""
    
    @staticmethod
    def parallel_process_frames(
        frames: List[Any],
        process_func: Callable,
        n_workers: int = None
    ) -> List[Any]:
        """并行处理帧"""
        n_workers = n_workers or multiprocessing.cpu_count()
        
        with ProcessPoolExecutor(max_workers=n_workers) as executor:
            results = list(executor.map(process_func, frames))
        
        return results
    
    @staticmethod
    def batch_process(
        items: List[Any],
        process_func: Callable,
        batch_size: int = 100
    ) -> List[Any]:
        """批量处理"""
        results = []
        
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            batch_results = [process_func(item) for item in batch]
            results.extend(batch_results)
        
        return results
    
    @staticmethod
    def get_optimal_settings(
        resolution: Tuple[int, int],
        target_size_mb: float
    ) -> Dict:
        """获取最优编码设置"""
        width, height = resolution
        pixels = width * height
        
        if target_size_mb < 10:
            return {
                "crf": 28,
                "preset": "fast",
                "resolution": (width // 2, height // 2)
            }
        elif target_size_mb < 50:
            return {
                "crf": 23,
                "preset": "medium",
                "resolution": (width, height)
            }
        else:
            return {
                "crf": 18,
                "preset": "slow",
                "resolution": (width, height)
            }


class MemoryEfficientProcessor:
    """内存高效处理器"""
    
    def __init__(self, max_memory_mb: int = 1024):
        self.max_memory = max_memory_mb * 1024 * 1024
    
    def process_large_video(
        self,
        video_path: str,
        process_func: Callable,
        output_path: str
    ):
        """处理大视频"""
        import cv2
        
        cap = cv2.VideoCapture(video_path)
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        frame_size = width * height * 3
        batch_size = self.max_memory // frame_size
        
        batch = []
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            
            batch.append(frame)
            
            if len(batch) >= batch_size:
                processed = [process_func(f) for f in batch]
                for p in processed:
                    out.write(p)
                batch = []
        
        if batch:
            processed = [process_func(f) for f in batch]
            for p in processed:
                out.write(p)
        
        cap.release()
        out.release()

49.6 本章小结

本章详细介绍了Python多媒体处理的核心概念和实践:

  1. 音频处理:音频读取、格式转换、音频效果
  2. 音频分析:频谱分析、特征提取、音频生成
  3. 视频处理:视频读取、格式转换、视频编辑
  4. 视频特效:滤镜效果、转场动画、字幕添加

练习题

  1. 实现一个音频编辑器,支持剪切、合并、特效处理
  2. 开发一个视频转码工具,支持多种格式转换
  3. 实现一个音频特征提取系统,支持音乐分类
  4. 开发一个视频水印工具,支持图片和文字水印
  5. 实现一个简单的视频剪辑软件,支持剪辑和转场

扩展阅读

Python技术丛书 - 江苏省宿城中等专业学校