"""Provides the :class:`Audio` class that handles audio signals
.. note::
Supports all audio format from ffmpeg (wav, mp3, flac, etc...). See
https://www.ffmpeg.org/general.html#File-Formats for details.
The :class:`Audio` class allows to load, save and manipulate
multichannels audio data. The underlying audio samples can be of one
of the following types (with the corresponding min and max):
========== =========== ===========
Type Min Max
========== =========== ===========
np.int16 -32768 +32767
np.int32 -2147483648 +2147483647
np.float32 -1.0 +1.0
np.float64 -1.0 +1.0
========== =========== ===========
When loading an audio file with :func:`Audio.load`, those min/max are
expected to be respected. When creating an :class:`Audio` instance
from a raw data array, the ``validate`` parameter in the class
constructor and the method :func:`Audio.is_valid` make sure the data
type and min/max are respected.
Examples
--------
>>> import os
>>> import numpy as np
>>> from shennong.audio import Audio
Create 1000 samples of a stereo signal at 16 kHz:
>>> audio = Audio(np.random.random((1000, 2)), 16000)
>>> audio.data.shape
(1000, 2)
>>> audio.dtype
dtype('float64')
>>> audio.sample_rate
16000
>>> audio.nchannels
2
>>> audio.duration
0.0625
Resample the signal to 8 kHz and convert it to 16 bits integers:
>>> audio2 = audio.resample(8000).astype(np.int16)
>>> audio2.sample_rate
8000
>>> audio2.duration == audio.duration
True
>>> audio2.dtype
dtype('int16')
>>> audio2.is_valid()
True
Save the :class:`Audio` instance as a wav file, load an existing wav
file as an :class:`Audio` instance:
>>> audio.save('stereo.wav')
>>> audio3 = Audio.load('stereo.wav')
>>> audio == audio3
True
>>> os.remove('stereo.wav')
Extract mono signal from a stereo one (`left` and `right` are instances of
:class:`Audio` as well):
>>> left = audio.channel(0)
>>> right = audio.channel(1)
>>> left.duration == right.duration == audio.duration
True
>>> left.nchannels == right.nchannels == 1
True
"""
import collections
import functools
import io
import os
import warnings
import wave
import numpy as np
import scipy.io.wavfile
import scipy.signal
import sox
import pydub
[docs]class Audio:
"""Create an audio signal with the given `data` and `sample_rate`
Attributes
----------
data : numpy array, shape = [nsamples, nchannels]
The waveform audio signal, must be of one of the supported
types (see above)
sample_rate : float
The sample frequency of the `data`, in Hertz
validate : bool, optional
When True, make sure the underlying data is valid (see
:meth:`is_valid`), default to True
Raises
------
ValueError
If `validate` is True and the audio data if not valid (see
:meth:`is_valid`)
"""
_metadata = collections.namedtuple(
'_metadata', 'nchannels sample_rate nsamples duration')
"""A structure to store wavs metadata, see :meth:`Audio.scan`"""
def __init__(self, data, sample_rate, validate=True):
self._sample_rate = int(sample_rate)
# force shape (n, 1) to be (n,)
self._data = (
data[:, 0] if data.ndim > 1 and data.shape[1] == 1 else data)
if validate and not self.is_valid():
raise ValueError(f'invalid audio data for type {self.dtype}')
def __eq__(self, other):
if self.sample_rate != other.sample_rate:
return False
return np.array_equal(self.data, other.data)
@property
def data(self):
"""The numpy array of audio data"""
return self._data
@property
def sample_rate(self):
"""The sample frequency of the signal in Hertz"""
return self._sample_rate
@property
def duration(self):
"""The duration of the signal in seconds"""
return self.nsamples / self.sample_rate
@property
def nchannels(self):
"""The number of audio channels in the signal"""
if self.data.ndim == 1:
return 1
return self.data.shape[1]
@property
def nsamples(self):
"""The number of samples in the signal"""
return self.data.shape[0]
@property
def shape(self):
"""Return the shape of the underlying data"""
return self.data.shape
@property
def dtype(self):
"""The numeric type of samples"""
return self.data.dtype
@property
def precision(self):
"""The number of bits per sample"""
return self.dtype.itemsize * 8
[docs] @classmethod
@functools.lru_cache()
def scan(cls, filename):
"""Returns the audio metadata without loading the file
Returns a Python namespace (a named tuple) `metadata` with the
following fields:
- metadata.nchannels : int, number of channels
- metadata.sample_rate : int, sample frequency in Hz
- metadata.nsamples : int, number of audio samples in the file
- metadata.duration : float, audio duration in seconds
This method is usefull to access metadata of an audio file without
loading it into memory, far more faster than :func:`load`.
Parameters
----------
filename : str
Audio filename on which to retrieve metadata, must be
an existing file
Returns
-------
metadata : namespace
A namespace with fields as described above
Raises
------
ValueError
If the `filename` is not a valid audio file that ffmpeg can
process.
"""
filename = str(filename)
if not os.path.isfile(filename):
raise ValueError(f'{filename}: file not found')
# using wave, very fast but supports only WAV files
try:
with wave.open(filename, 'r') as wav:
return cls._metadata(
wav.getnchannels(),
wav.getframerate(),
wav.getnframes(),
wav.getnframes() / wav.getframerate())
except wave.Error:
pass
# using pydub, cross-formats but very slow
try:
info = pydub.utils.mediainfo(filename)
return cls._metadata(
int(info['channels']),
int(info['sample_rate']),
int(int(info['sample_rate']) * float(info['duration'])),
float(info['duration']))
except Exception:
raise ValueError(
f'cannot scan audio file {filename}') from None
# we use a memoize cache because Audio.load is often called to
# load only segments of a file. So the cache avoid to reload again
# and again the same file to extract only a chunk of it. A little
# maxsize is enough because access to audio chunks are usually
# ordered.
[docs] @classmethod
@functools.lru_cache(maxsize=2)
def load(cls, filename):
"""Creates an `Audio` instance from a WAV file
Parameters
----------
filename : str
Path to the audio file to load, must be an existing file
Returns
-------
audio : Audio
The Audio instance initialized from the `filename`
Raises
------
ValueError
If the `filename` is not a valid audio file.
"""
filename = str(filename)
if not os.path.isfile(filename):
raise ValueError(f'{filename}: file not found')
# load the audio signal
try:
# first try with scipy. It only supports wav files, but support
# float32 wavs (which pydub/ffmpeg or sox don't support)
rate, data = scipy.io.wavfile.read(filename)
except ValueError:
try:
# if scipy failed (mostly because it is not a WAV file), give a
# try to ffmpeg with pydub
segment = pydub.AudioSegment.from_file(filename)
rate = segment.frame_rate
data = np.atleast_2d(np.array(
[c.get_array_of_samples()
for c in segment.split_to_mono()])).T
except pydub.exceptions.PydubException as err:
raise ValueError(
f'{filename}: cannot read file, {err}') from None
return cls(data, rate, validate=False)
[docs] def save(self, filename):
"""Saves the audio data to a `filename`
Parameters
----------
filename : str
The audio file to create, format is guessed from extension
Raises
------
ValueError
If the file already exists or is unreachable
"""
filename = str(filename)
if os.path.isfile(filename):
raise ValueError(f'{filename}: file already exists')
if '.' not in filename:
raise ValueError(
f'{filename}: cannot write audio file without extension')
extension = filename.split('.')[-1]
if extension.lower() == 'wav':
# saving wav files using scipy
try:
scipy.io.wavfile.write(filename, self.sample_rate, self.data)
except ValueError as err: # pragma: nocover
raise ValueError(
f'{filename}: cannot write file, {err}') from None
else:
# all other audio extensions are handled by pydub/ffmpeg.
self._aspydub().export(filename, format=extension)
def _aspydub(self):
"""Converts the audio to a pydub.AudioSegment instance"""
with io.BytesIO() as wav:
scipy.io.wavfile.write(wav, self.sample_rate, self.data)
wav.seek(0)
return pydub.AudioSegment.from_wav(wav)
[docs] def channel(self, index):
"""Builds a mono signal from a multi-channel one
Parameters
----------
index : int
The audio channel to extract from the original signal
Returns
-------
mono : Audio
The extracted single-channel data
Raises
------
ValueError
If `index` >= :func:`nchannels`
"""
if index == 0 and self.nchannels == 1:
return self
if index >= self.nchannels:
raise ValueError(
f'not enough channels ({self.nchannels}) to extract '
f'the index {index} (indices count starts at 0)')
return Audio(self.data[:, index], self.sample_rate)
[docs] def resample(self, sample_rate, backend='sox'):
"""Returns the audio signal resampled at the given `sample_rate`
This method first rely on `pysox
<https://github.com/rabitt/pysox>`_ (excepted if `backend` is
'scipy') and, if sox is not installed on your system or
anything goes wrong it falls back to `scipy.signal.resample`.
The sox backend is very fast and accurate but relies on an
external binary whereas scipy backend can be very slow but
works in pure Python.
Parameters
----------
sample_rate : int
The sample frequency used to resample the signal, in Hz
Returns
-------
audio : Audio
An Audio instance containing the resampled signal
backend : str, optional
The backend to use for resampling, must be 'sox' or
'scipy', default to 'sox'
Raises
------
ValueError
If the `backend` is not 'sox' or 'scipy', or if the
resampling failed
"""
if backend not in ('sox', 'scipy'):
raise ValueError(f'backend must be sox or scipy, it is {backend}')
if backend == 'sox':
return self._resample_sox(sample_rate)
return self._resample_scipy(sample_rate)
def _resample_sox(self, sample_rate):
"""Resample the audio signal to the given `sample_rate` using sox"""
try:
tfm = sox.Transformer()
tfm.set_output_format(rate=sample_rate)
data = tfm.build_array(
input_array=self.data, sample_rate_in=self.sample_rate)
return Audio(data, sample_rate, validate=False)
except (sox.core.SoxError, ValueError):
raise ValueError(f'resampling at {sample_rate} failed!')
def _resample_scipy(self, sample_rate):
"""Resample the audio signal to the given `sample_rate` using scipy"""
if sample_rate == self.sample_rate:
return self
# number of samples in the resampled signal
nsamples = int(self.nsamples * sample_rate / self.sample_rate)
# scipy method issues warnings we want to inhibit
with warnings.catch_warnings():
warnings.simplefilter('ignore', category=FutureWarning)
data = scipy.signal.resample(self.data, nsamples)
# resampling cast to float64, reformat to the original dtype
return Audio(data.astype(self.dtype), sample_rate, validate=False)
@staticmethod
def _is_valid_dtype(dtype):
"""Returns True if `dtype` is a supported data type, False otherwise"""
supported_types = [np.dtype(t) for t in (
np.int16, np.int32, np.float32, np.float64)]
return dtype in supported_types
[docs] def is_valid(self):
"""Returns True if the audio data is valid, False otherwise
An `Audio` instance is valid if the underlying data type
is supported (must be np.int16, np.int32, np.float32 or
np.float64), and if the samples min/max are within the
expected boundaries for the given data type (see above).
"""
# make sure the data type is valid
if not self._is_valid_dtype(self.dtype):
warnings.warn(f'unsupported audio data type: {self.dtype}')
return False
# get the theoretical min/max
if self.dtype is np.dtype(np.int16):
emin = -2**15
emax = 2**15 - 1
elif self.dtype is np.dtype(np.int32):
emin = -2**31
emax = 2**31 - 1
else: # float32 or float64
emin = -1
emax = 1
# get the data min/max and checks they are within theoretical
# boundaries
dmin = np.amin(self.data)
dmax = np.amax(self.data)
if dmin < emin or dmax > emax:
warnings.warn(
f'invalid audio for type {self.dtype}: '
f'boundaries must be in ({emin}, {emax}) '
f'but are ({dmin}, {dmax})')
return False
return True
[docs] def astype(self, dtype):
"""Returns the audio signal converted to the `dtype` numeric type
The valid types are np.int16, np.int32, np.float32 or
np.float64, see above for the types min and max.
Parameters
----------
dtype : numeric type
Must be an integer or a floating-point type in the types
described above.
Raises
------
ValueError
If the requested `dtype` is not supported
"""
# do nothing if we already have the requested dtype
if self.dtype is np.dtype(dtype):
return self
# make sure we support the requested dtype
if not self._is_valid_dtype(dtype):
raise ValueError(f'unsupported audio data type: {dtype}')
# starting from int16
if self.dtype is np.dtype(np.int16):
if dtype is np.int32:
data = self.data * 2**15
else: # float32 or float64
data = self.data / 2**15
# starting from int32
elif self.dtype is np.dtype(np.int32):
if dtype is np.int16:
data = self.data / 2**15
else: # float32 or float64
data = self.data / 2**30
# starting from float32 or float64
else:
if dtype is np.int16:
data = self.data * 2**15
elif dtype is np.int32:
data = self.data * 2**30
else: # float32 or float64
data = self.data
return Audio(data.astype(dtype), self.sample_rate, validate=False)
[docs] def segment(self, segments):
"""Returns audio chunks segmented from the original signal
Parameters
----------
segments : list of pairs of floats
A list of pairs (tstart, tstop) of the start and stop
indices (in seconds) of the signal chunks we are going to
extract. The times `tstart` and `tstop` must be float,
with `tstart` < `tstop`.
Returns
-------
chunks : list of Audio
The signal chunks created from the given `segments`
Raises
------
ValueError
If one element in `segments` is not a pair of float or if
`tstart` >= `tstop`. If `segments` is not a list.
"""
# ensure segments is well formatted
if not isinstance(segments, list):
raise ValueError('segments must be a list')
for segment in segments:
try:
if not len(segment) == 2:
raise ValueError('segments elements must be pairs')
except TypeError:
raise ValueError('segments elements must be pairs')
if segment[0] >= segment[1]:
raise ValueError('time indices in segments must be sorted')
chunks = []
for segment in segments:
istart = int(segment[0] * self.sample_rate)
istop = int(segment[1] * self.sample_rate)
chunks.append(Audio(
self.data[istart:istop], self.sample_rate, validate=False))
return chunks