Examples¶
The following examples can be found in the examples directory of the PyOgg GitHub repository.
You can run these examples either by downloading the appropriate file(s) or cloning the repository. Note that some examples assume that the demonstration wave and Opus files are available.
Play an OggOpus file¶
# This is an example of the use of PyOgg.
#
# Author: Matthew Walker 2020-06-01
#
# An Ogg Opus file (a file containing an Opus stream wrapped inside an
# Ogg container) is loaded using the Opusfile library. This provides
# the entire file in one PCM encoded buffer. That buffer is converted
# to a NumPy array and then played using simpleaudio.
#
# On successful execution of this program, you should hear the audio
# being played and the console will display comething like:
#
# $ python 01-play-opus-simpleaudio.py
# Reading Ogg Opus file...
#
# Read Ogg Opus file
# Channels:
# 2
# Frequency (samples per second):
# 48000
# Buffer Length (bytes):
# 960000
# Shape of numpy array (number of samples per channel, number of channels):
# (240000, 2)
#
# Playing...
# Finished.
try:
import pyogg
import simpleaudio as sa # type: ignore
import numpy # type: ignore
except ImportError:
import os
should_install_requirements = input(\
"This example requires additional libraries to work.\n" +
" py-simple-audio (simpleaudio),\n" +
" NumPy (numpy)\n" +
" And PyOgg or course.\n" +
"Would you like to install them right now?\n"+
"(Y/N): ")
if should_install_requirements.lower() == "y":
import subprocess, sys
install_command = [
sys.executable,
"-m",
"pip",
"install",
"-r",
os.path.realpath("01-play-opus-simpleaudio.requirements.txt")
]
popen = subprocess.Popen(install_command,
stdout=subprocess.PIPE, universal_newlines=True)
assert popen.stdout is not None
for stdout_line in iter(popen.stdout.readline, ""):
print(stdout_line, end="")
popen.stdout.close()
popen.wait()
print("Done.\n")
import pyogg
import simpleaudio as sa # type: ignore
import numpy # type: ignore
else:
os._exit(0)
import ctypes
from datetime import datetime
# Specify the filename to read
filename = "left-right-demo-5s.opus"
# Read the file using OpusFile
print("Reading Ogg Opus file...")
opus_file = pyogg.OpusFile(filename)
# Display summary information about the audio
print("\nRead Ogg Opus file")
print("Channels:\n ", opus_file.channels)
print("Frequency (samples per second):\n ",opus_file.frequency)
print("Buffer Length (bytes):\n ", len(opus_file.buffer))
# Get the data as a NumPy array
buf = opus_file.as_array()
# The shape of the array can be read as
# "(number of samples per channel, number of channels)".
print(
"Shape of numpy array (number of samples per channel, "+
"number of channels):\n ",
buf.shape
)
# Play the audio
print("\nPlaying...")
play_obj = sa.play_buffer(buf,
opus_file.channels,
opus_file.bytes_per_sample,
opus_file.frequency)
# Wait until sound has finished playing
play_obj.wait_done()
print("Finished.")
Stream an OggOpus file¶
"""Reads an Ogg-Opus file using OpusFile and OpusFileStream.
Reads an Ogg-Opus file using OpusFileStream and compares it to the
results of reading it with OpusFile. Gives timing information for the
two approaches.
A typical output:
Read 240000 samples from OpusFile (in 53.8 milliseconds).
Read 240000 samples from the OpusFileStream
(in 252 reads averaging 0.23 milliseconds each).
OpusFileStream data was identical to OpusFile data.
"""
import time
import numpy # type: ignore
import pyogg
# Specify a file to process
opus_file_filename = "left-right-demo-5s.opus"
opus_file_stream_filename = "left-right-demo-5s.opus"
# Open the file using OpusFile, which reads the entire file
# immediately and places it into an internal buffer.
start_time = time.time()
opus_file = pyogg.OpusFile(opus_file_filename)
end_time = time.time()
duration = (end_time-start_time)*1000
array = opus_file.as_array()
array_index = 0
print("Read {:d} samples from OpusFile (in {:.1f} milliseconds).".format(
len(array),
duration
))
# Open the file using OpusFileStream, which does not read the entire
# file immediately.
stream = pyogg.OpusFileStream(opus_file_stream_filename)
# Loop through the OpusFileStream until we've read all the data
samples_read = 0
identical = True
times = []
while True:
# Read the next part of the stream
start_time = time.time()
buf = stream.get_buffer_as_array()
end_time = time.time()
duration = (end_time-start_time)*1000
times.append(duration)
# Check if we've reached the end of the stream
if buf is None:
break
# Increment the number of samples read
samples_read += len(buf)
# Check we've not read too much data from the stream
if array_index+len(buf) > len(array):
print("OpusFileStream data was identical to OpusFile data,\n"+
"however there was more data in the OpusFileStream than\n"+
"in the OpusFile.")
identical = False
break
# Compare the stream with the OpusFile data. (They should be
# identical.)
comparison = array[array_index:array_index+len(buf)] == buf
if not numpy.all(comparison):
print("OpusFileStream data was NOT identical to OpusFile data.")
identical = False
break
# Move the OpusFile index along
array_index += len(buf)
avg_time = numpy.mean(times)
print(
("Read {:d} samples from the OpusFileStream\n"+
"(in {:d} reads averaging {:.2f} milliseconds each).").format(
samples_read,
len(times),
avg_time
)
)
if identical == False:
# We've finished our work here
pass
elif array_index == len(array):
# We completed the comparison successfully.
print("OpusFileStream data was identical to OpusFile data.")
else:
# There was remaining data
print("OpusFileStream data was identical to OpusFile data,\n"+
"however there was more data in the OpusFile than\n"+
"in the OpusFileStream.")
Encode and Decode Opus-Packets¶
import wave
from pyogg import OpusEncoder
from pyogg import OpusDecoder
if __name__ == "__main__":
# Setup encoding
# ==============
# Read a wav file to obtain PCM data
filename = "left-right-demo-5s.wav"
wave_read = wave.open(filename, "rb")
print("Reading wav from file '{:s}'".format(filename))
# Extract the wav's specification
channels = wave_read.getnchannels()
print("Number of channels:", channels)
samples_per_second = wave_read.getframerate()
print("Sampling frequency:", samples_per_second)
bytes_per_sample = wave_read.getsampwidth()
# Create an Opus encoder
opus_encoder = OpusEncoder()
opus_encoder.set_application("audio")
opus_encoder.set_sampling_frequency(samples_per_second)
opus_encoder.set_channels(channels)
# Calculate the desired frame size (in samples per channel)
desired_frame_duration = 20/1000 # milliseconds
desired_frame_size = int(desired_frame_duration * samples_per_second)
# Setup decoding
# ==============
# Create an Opus decoder
opus_decoder = OpusDecoder()
opus_decoder.set_channels(channels)
opus_decoder.set_sampling_frequency(samples_per_second)
# Open an output wav for the decoded PCM
output_filename = "output-"+filename
wave_write = wave.open(output_filename, "wb")
print("Writing wav into file '{:s}'".format(output_filename))
# Save the wav's specification
wave_write.setnchannels(channels)
wave_write.setframerate(samples_per_second)
wave_write.setsampwidth(bytes_per_sample)
# Execute encode-decode
# =====================
# Loop through the wav file's PCM data and encode it as Opus
bytes_encoded = 0
while True:
# Get data from the wav file
pcm = wave_read.readframes(desired_frame_size)
# Check if we've finished reading the wav file
if len(pcm) == 0:
break
# Calculate the effective frame size from the number of bytes
# read
effective_frame_size = (
len(pcm) # bytes
// bytes_per_sample
// channels
)
# Check if we've received enough data
if effective_frame_size < desired_frame_size:
# We haven't read a full frame from the wav file, so this
# is most likely a final partial frame before the end of
# the file. We'll pad the end of this frame with silence.
pcm += (
b"\x00"
* ((desired_frame_size - effective_frame_size)
* bytes_per_sample
* channels)
)
# Encode the PCM data
encoded_packet = opus_encoder.encode(pcm)
bytes_encoded += len(encoded_packet)
# At this stage we now have a buffer containing an
# Opus-encoded packet. This could be sent over UDP, for
# example, and then decoded with OpusDecoder. However it
# cannot really be saved to a file without wrapping it in the
# likes of an Ogg stream; for this see OggOpusWriter.
# For this example, we will now immediately decode this
# encoded packet using OpusDecoder.
decoded_pcm = opus_decoder.decode(encoded_packet)
# Save the decoded PCM as a new wav file
wave_write.writeframes(decoded_pcm)
wave_read.close()
wave_write.close()
print("Total bytes of encoded packets:", bytes_encoded)
print("Finished.")
Buffered-Encode and Decode Opus-Packets¶
import wave
from pyogg import OpusBufferedEncoder
from pyogg import OpusDecoder
if __name__ == "__main__":
# Setup encoding
# ==============
# Read a wav file to obtain PCM data
filename = "left-right-demo-5s.wav"
wave_read = wave.open(filename, "rb")
print("Reading wav from file '{:s}'".format(filename))
# Extract the wav's specification
channels = wave_read.getnchannels()
print("Number of channels:", channels)
samples_per_second = wave_read.getframerate()
print("Sampling frequency:", samples_per_second)
bytes_per_sample = wave_read.getsampwidth()
# Create an Opus encoder
opus_encoder = OpusBufferedEncoder()
opus_encoder.set_application("audio")
opus_encoder.set_sampling_frequency(samples_per_second)
opus_encoder.set_channels(channels)
desired_frame_duration = 20 # ms
opus_encoder.set_frame_size(desired_frame_duration)
# Setup decoding
# ==============
# Create an Opus decoder
opus_decoder = OpusDecoder()
opus_decoder.set_channels(channels)
opus_decoder.set_sampling_frequency(samples_per_second)
# Open an output wav for the decoded PCM
output_filename = "output-"+filename
wave_write = wave.open(output_filename, "wb")
print("Writing wav into file '{:s}'".format(output_filename))
# Save the wav's specification
wave_write.setnchannels(channels)
wave_write.setframerate(samples_per_second)
wave_write.setsampwidth(bytes_per_sample)
# Execute encode-decode
# =====================
# Loop through the wav file's PCM data and encode it as Opus
bytes_encoded = 0
finished = False
while not finished:
# Get data from the wav file
frames_per_read = 1000
pcm = wave_read.readframes(frames_per_read)
# Check if we've finished reading the wav file
if len(pcm) == 0:
# Encode what's left of the PCM and flush it by filling
# any partial frames with silence.
finished = True
# Encode the PCM data
encoded_packets = opus_encoder.buffered_encode(
memoryview(bytearray(pcm)), # FIXME
flush=finished
)
# At this stage we now have a list of Opus-encoded packets.
# These could be sent over UDP, for example, and then decoded
# with OpusDecoder. However they cannot really be saved to a
# file without wrapping them in the likes of an Ogg stream;
# for this see OggOpusWriter.
# For this example, we will now immediately decode the encoded
# packets using OpusDecoder.
for encoded_packet, _, _ in encoded_packets:
bytes_encoded += len(encoded_packet)
decoded_pcm = opus_decoder.decode(encoded_packet)
# Save the decoded PCM as a new wav file
wave_write.writeframes(decoded_pcm)
wave_read.close()
wave_write.close()
print("Total bytes of encoded packets:", bytes_encoded)
print("Finished.")
Write an OggOpus File¶
import wave
import pyogg
if __name__ == "__main__":
# Read a wav file to obtain PCM data
filename = "left-right-demo-5s.wav"
wave_read = wave.open(filename, "rb")
print("Reading wav from file '{:s}'".format(filename))
# Extract the wav's specification
channels = wave_read.getnchannels()
print("Number of channels:", channels)
samples_per_second = wave_read.getframerate()
print("Sampling frequency:", samples_per_second)
bytes_per_sample = wave_read.getsampwidth()
original_length = wave_read.getnframes()
print("Length:", original_length)
# Create a OpusBufferedEncoder
opus_buffered_encoder = pyogg.OpusBufferedEncoder()
opus_buffered_encoder.set_application("audio")
opus_buffered_encoder.set_sampling_frequency(samples_per_second)
opus_buffered_encoder.set_channels(channels)
opus_buffered_encoder.set_frame_size(20) # milliseconds
# Create an OggOpusWriter
output_filename = filename+".opus"
print("Writing OggOpus file to '{:s}'".format(output_filename))
ogg_opus_writer = pyogg.OggOpusWriter(
output_filename,
opus_buffered_encoder
)
# Calculate the desired frame size (in samples per channel)
desired_frame_duration = 20/1000 # milliseconds
desired_frame_size = int(desired_frame_duration * samples_per_second)
# Loop through the wav file's PCM data and write it as OggOpus
chunk_size = 1000 # bytes
while True:
# Get data from the wav file
pcm = wave_read.readframes(chunk_size)
# Check if we've finished reading the wav file
if len(pcm) == 0:
break
# Encode the PCM data
ogg_opus_writer.write(
memoryview(bytearray(pcm)) # FIXME
)
# We've finished writing the file
ogg_opus_writer.close()
# Check that the output file is that same length as the original
print("Reading output file:", output_filename)
opus_file = pyogg.OpusFile(output_filename)
print("File read")
output_length = opus_file.as_array().shape[0]
print("Output length:", output_length)
if original_length != output_length:
print("ERROR: The original length is different to the output length")
print("Finished.")