Examples

The following examples can be found in the examples directory of the PyOgg GitHub repository.

You can run these examples either by downloading the appropriate file(s) or cloning the repository. Note that some examples assume that the demonstration wave and Opus files are available.

Play an OggOpus file

# This is an example of the use of PyOgg.
#
# Author: Matthew Walker 2020-06-01
#
# An Ogg Opus file (a file containing an Opus stream wrapped inside an
# Ogg container) is loaded using the Opusfile library.  This provides
# the entire file in one PCM encoded buffer.  That buffer is converted
# to a NumPy array and then played using simpleaudio.
#
# On successful execution of this program, you should hear the audio
# being played and the console will display comething like:
#
#    $ python 01-play-opus-simpleaudio.py
#    Reading Ogg Opus file...
#    
#    Read Ogg Opus file
#    Channels:
#       2
#    Frequency (samples per second):
#       48000
#    Buffer Length (bytes):
#       960000
#    Shape of numpy array (number of samples per channel, number of channels):
#       (240000, 2)
#    
#    Playing...
#    Finished.


try:
    import pyogg
    import simpleaudio as sa # type: ignore
    import numpy # type: ignore
except ImportError:
    import os
    should_install_requirements = input(\
        "This example requires additional libraries to work.\n" +
        "  py-simple-audio (simpleaudio),\n" +
        "  NumPy (numpy)\n" +
        "  And PyOgg or course.\n" +
        "Would you like to install them right now?\n"+
        "(Y/N): ")
    if should_install_requirements.lower() == "y":
        import subprocess, sys
        
        install_command = [
            sys.executable,
            "-m",
            "pip",
            "install",
            "-r",
            os.path.realpath("01-play-opus-simpleaudio.requirements.txt")
        ]

        popen = subprocess.Popen(install_command,
                                 stdout=subprocess.PIPE, universal_newlines=True)

        
        assert popen.stdout is not None
        for stdout_line in iter(popen.stdout.readline, ""):
            print(stdout_line, end="")
            
        popen.stdout.close()
        
        popen.wait()

        print("Done.\n")

        import pyogg
        import simpleaudio as sa # type: ignore
        import numpy # type: ignore

    else:
        os._exit(0)
        
import ctypes
from datetime import datetime


# Specify the filename to read
filename = "left-right-demo-5s.opus"

# Read the file using OpusFile
print("Reading Ogg Opus file...")
opus_file = pyogg.OpusFile(filename)

# Display summary information about the audio
print("\nRead Ogg Opus file")
print("Channels:\n  ", opus_file.channels)
print("Frequency (samples per second):\n  ",opus_file.frequency)
print("Buffer Length (bytes):\n  ", len(opus_file.buffer))

# Get the data as a NumPy array
buf = opus_file.as_array()

# The shape of the array can be read as
# "(number of samples per channel, number of channels)".
print(
    "Shape of numpy array (number of samples per channel, "+
    "number of channels):\n  ",
    buf.shape
)

# Play the audio
print("\nPlaying...")
play_obj = sa.play_buffer(buf,
                          opus_file.channels,
                          opus_file.bytes_per_sample,
                          opus_file.frequency)

# Wait until sound has finished playing
play_obj.wait_done()  

print("Finished.")

Stream an OggOpus file

"""Reads an Ogg-Opus file using OpusFile and OpusFileStream.

Reads an Ogg-Opus file using OpusFileStream and compares it to the
results of reading it with OpusFile.  Gives timing information for the
two approaches.

A typical output:

Read 240000 samples from OpusFile (in 53.8 milliseconds).
Read 240000 samples from the OpusFileStream
(in 252 reads averaging 0.23 milliseconds each).
OpusFileStream data was identical to OpusFile data.

"""

import time

import numpy # type: ignore
import pyogg

# Specify a file to process
opus_file_filename = "left-right-demo-5s.opus"
opus_file_stream_filename = "left-right-demo-5s.opus"

# Open the file using OpusFile, which reads the entire file
# immediately and places it into an internal buffer.
start_time = time.time()
opus_file = pyogg.OpusFile(opus_file_filename)
end_time = time.time()
duration = (end_time-start_time)*1000
array = opus_file.as_array()
array_index = 0
print("Read {:d} samples from OpusFile (in {:.1f} milliseconds).".format(
    len(array),
    duration
))

# Open the file using OpusFileStream, which does not read the entire
# file immediately.
stream = pyogg.OpusFileStream(opus_file_stream_filename)

# Loop through the OpusFileStream until we've read all the data
samples_read = 0
identical = True
times = []
while True:
    # Read the next part of the stream
    start_time = time.time()
    buf = stream.get_buffer_as_array()
    end_time = time.time()
    duration = (end_time-start_time)*1000
    times.append(duration)

    # Check if we've reached the end of the stream
    if buf is None:
        break

    # Increment the number of samples read
    samples_read += len(buf)

    # Check we've not read too much data from the stream
    if array_index+len(buf) > len(array):
        print("OpusFileStream data was identical to OpusFile data,\n"+
              "however there was more data in the OpusFileStream than\n"+
              "in the OpusFile.")
        identical = False
        break

    # Compare the stream with the OpusFile data.  (They should be
    # identical.)
    comparison = array[array_index:array_index+len(buf)] == buf
    if not numpy.all(comparison):
        print("OpusFileStream data was NOT identical to OpusFile data.")
        identical = False
        break

    # Move the OpusFile index along
    array_index += len(buf)

avg_time = numpy.mean(times)
print(
    ("Read {:d} samples from the OpusFileStream\n"+
     "(in {:d} reads averaging {:.2f} milliseconds each).").format(
         samples_read,
         len(times),
         avg_time
     )
)

if identical == False:
    # We've finished our work here
    pass
elif array_index == len(array):
    # We completed the comparison successfully.
    print("OpusFileStream data was identical to OpusFile data.")
else:
    # There was remaining data
    print("OpusFileStream data was identical to OpusFile data,\n"+
          "however there was more data in the OpusFile than\n"+
          "in the OpusFileStream.")

Encode and Decode Opus-Packets

import wave
from pyogg import OpusEncoder
from pyogg import OpusDecoder

if __name__ == "__main__":
    # Setup encoding
    # ==============
    
    # Read a wav file to obtain PCM data
    filename = "left-right-demo-5s.wav"
    wave_read = wave.open(filename, "rb")
    print("Reading wav from file '{:s}'".format(filename))

    # Extract the wav's specification
    channels = wave_read.getnchannels()
    print("Number of channels:", channels)
    samples_per_second = wave_read.getframerate()
    print("Sampling frequency:", samples_per_second)
    bytes_per_sample = wave_read.getsampwidth()

    # Create an Opus encoder
    opus_encoder = OpusEncoder()
    opus_encoder.set_application("audio")
    opus_encoder.set_sampling_frequency(samples_per_second)
    opus_encoder.set_channels(channels)

    # Calculate the desired frame size (in samples per channel)
    desired_frame_duration = 20/1000 # milliseconds
    desired_frame_size = int(desired_frame_duration * samples_per_second)

    # Setup decoding
    # ==============

    # Create an Opus decoder
    opus_decoder = OpusDecoder()
    opus_decoder.set_channels(channels)
    opus_decoder.set_sampling_frequency(samples_per_second)

    # Open an output wav for the decoded PCM
    output_filename = "output-"+filename
    wave_write = wave.open(output_filename, "wb")
    print("Writing wav into file '{:s}'".format(output_filename))

    # Save the wav's specification
    wave_write.setnchannels(channels)
    wave_write.setframerate(samples_per_second)
    wave_write.setsampwidth(bytes_per_sample)

    # Execute encode-decode
    # =====================
    
    # Loop through the wav file's PCM data and encode it as Opus
    bytes_encoded = 0
    while True:
        # Get data from the wav file
        pcm = wave_read.readframes(desired_frame_size)

        # Check if we've finished reading the wav file
        if len(pcm) == 0:
            break

        # Calculate the effective frame size from the number of bytes
        # read
        effective_frame_size = (
            len(pcm) # bytes
            // bytes_per_sample
            // channels
        )

        # Check if we've received enough data
        if effective_frame_size < desired_frame_size:
            # We haven't read a full frame from the wav file, so this
            # is most likely a final partial frame before the end of
            # the file.  We'll pad the end of this frame with silence.
            pcm += (
                b"\x00"
                * ((desired_frame_size - effective_frame_size)
                   * bytes_per_sample
                   * channels)
            )
                
        # Encode the PCM data
        encoded_packet = opus_encoder.encode(pcm)
        bytes_encoded += len(encoded_packet)

        # At this stage we now have a buffer containing an
        # Opus-encoded packet.  This could be sent over UDP, for
        # example, and then decoded with OpusDecoder.  However it
        # cannot really be saved to a file without wrapping it in the
        # likes of an Ogg stream; for this see OggOpusWriter.
            
        # For this example, we will now immediately decode this
        # encoded packet using OpusDecoder.
        decoded_pcm = opus_decoder.decode(encoded_packet)

        # Save the decoded PCM as a new wav file
        wave_write.writeframes(decoded_pcm)

    wave_read.close()
    wave_write.close()
    print("Total bytes of encoded packets:", bytes_encoded)
    print("Finished.")

Buffered-Encode and Decode Opus-Packets

import wave
from pyogg import OpusBufferedEncoder
from pyogg import OpusDecoder

if __name__ == "__main__":
    # Setup encoding
    # ==============
    
    # Read a wav file to obtain PCM data
    filename = "left-right-demo-5s.wav"
    wave_read = wave.open(filename, "rb")
    print("Reading wav from file '{:s}'".format(filename))

    # Extract the wav's specification
    channels = wave_read.getnchannels()
    print("Number of channels:", channels)
    samples_per_second = wave_read.getframerate()
    print("Sampling frequency:", samples_per_second)
    bytes_per_sample = wave_read.getsampwidth()

    # Create an Opus encoder
    opus_encoder = OpusBufferedEncoder()
    opus_encoder.set_application("audio")
    opus_encoder.set_sampling_frequency(samples_per_second)
    opus_encoder.set_channels(channels)
    desired_frame_duration = 20 # ms
    opus_encoder.set_frame_size(desired_frame_duration)

    # Setup decoding
    # ==============

    # Create an Opus decoder
    opus_decoder = OpusDecoder()
    opus_decoder.set_channels(channels)
    opus_decoder.set_sampling_frequency(samples_per_second)

    # Open an output wav for the decoded PCM
    output_filename = "output-"+filename
    wave_write = wave.open(output_filename, "wb")
    print("Writing wav into file '{:s}'".format(output_filename))

    # Save the wav's specification
    wave_write.setnchannels(channels)
    wave_write.setframerate(samples_per_second)
    wave_write.setsampwidth(bytes_per_sample)

    # Execute encode-decode
    # =====================
    
    # Loop through the wav file's PCM data and encode it as Opus
    bytes_encoded = 0
    finished = False
    while not finished:
        # Get data from the wav file
        frames_per_read = 1000
        pcm = wave_read.readframes(frames_per_read)

        # Check if we've finished reading the wav file
        if len(pcm) == 0:
            # Encode what's left of the PCM and flush it by filling
            # any partial frames with silence.
            finished = True

        # Encode the PCM data
        encoded_packets = opus_encoder.buffered_encode(
            memoryview(bytearray(pcm)), # FIXME
            flush=finished
        )

        # At this stage we now have a list of Opus-encoded packets.
        # These could be sent over UDP, for example, and then decoded
        # with OpusDecoder.  However they cannot really be saved to a
        # file without wrapping them in the likes of an Ogg stream;
        # for this see OggOpusWriter.
            
        # For this example, we will now immediately decode the encoded
        # packets using OpusDecoder.
        for encoded_packet, _, _ in encoded_packets:
            bytes_encoded += len(encoded_packet)

            decoded_pcm = opus_decoder.decode(encoded_packet)

            # Save the decoded PCM as a new wav file
            wave_write.writeframes(decoded_pcm)

    wave_read.close()
    wave_write.close()
    print("Total bytes of encoded packets:", bytes_encoded)
    print("Finished.")

Write an OggOpus File

import wave
import pyogg

if __name__ == "__main__":
    # Read a wav file to obtain PCM data
    filename = "left-right-demo-5s.wav"
    wave_read = wave.open(filename, "rb")
    print("Reading wav from file '{:s}'".format(filename))

    # Extract the wav's specification
    channels = wave_read.getnchannels()
    print("Number of channels:", channels)
    samples_per_second = wave_read.getframerate()
    print("Sampling frequency:", samples_per_second)
    bytes_per_sample = wave_read.getsampwidth()
    original_length = wave_read.getnframes()
    print("Length:", original_length)

    # Create a OpusBufferedEncoder
    opus_buffered_encoder = pyogg.OpusBufferedEncoder()
    opus_buffered_encoder.set_application("audio")
    opus_buffered_encoder.set_sampling_frequency(samples_per_second)
    opus_buffered_encoder.set_channels(channels)
    opus_buffered_encoder.set_frame_size(20) # milliseconds
    
    # Create an OggOpusWriter
    output_filename = filename+".opus"
    print("Writing OggOpus file to '{:s}'".format(output_filename))
    ogg_opus_writer = pyogg.OggOpusWriter(
        output_filename,
        opus_buffered_encoder
    )

    # Calculate the desired frame size (in samples per channel)
    desired_frame_duration = 20/1000 # milliseconds
    desired_frame_size = int(desired_frame_duration * samples_per_second)
    
    # Loop through the wav file's PCM data and write it as OggOpus
    chunk_size = 1000 # bytes
    while True:
        # Get data from the wav file
        pcm = wave_read.readframes(chunk_size)

        # Check if we've finished reading the wav file
        if len(pcm) == 0:
            break

        # Encode the PCM data
        ogg_opus_writer.write(
            memoryview(bytearray(pcm)) # FIXME
        )

    # We've finished writing the file
    ogg_opus_writer.close()

    # Check that the output file is that same length as the original
    print("Reading output file:", output_filename)
    opus_file = pyogg.OpusFile(output_filename)
    print("File read")
    output_length = opus_file.as_array().shape[0]
    print("Output length:", output_length)

    if original_length != output_length:
        print("ERROR: The original length is different to the output length")
        
    print("Finished.")