Source code for ABXpy.sampling.sampler

```"""The sampler class implementing incremental sampling without replacement.

Incremental meaning that you don't have to draw the whole sample at
once, instead at any given time you can get a piece of the sample of a
size you specify. This is useful for very large sample sizes.

"""

import numpy as np
import math

[docs]class IncrementalSampler(object):
"""Class for sampling without replacement in an incremental fashion

Toy example of usage:

sampler = IncrementalSampler(10**4, 10**4, step=100, \
relative_indexing=False)
complete_sample = np.concatenate([sample for sample in sampler])
assert all(complete_sample==range(10**4))

More realistic example of usage: sampling without replacement 1
million items from a total of 1 trillion items, considering 100
millions items at a time

sampler = IncrementalSampler(10**12, 10**6, step=10**8, \
relative_indexing=False)
complete_sample = np.concatenate([sample for sample in sampler])

"""
# sampling K sample in a a population of size N
# both K and N can be very large
def __init__(self, N, K, step=None, relative_indexing=True,
dtype=np.int64):
assert K <= N
self.N = N  # remaining items to sample from
self.K = K  # remaining items to be sampled
self.initial_N = N
self.relative_indexing = relative_indexing
self.type = dtype  # the type of the elements of the sample
# step used when iterating over the sampler
if step is None:
# 10**4 samples by iteration on average
self.step = 10 ** 4 * N // K
else:
self.step = step

# method for implementing the iterable pattern
def __iter__(self):
return self

# method for implementing the iterable pattern
[docs]    def next(self):
if self.N == 0:
raise StopIteration
return self.sample(self.step)

[docs]    def sample(self, n, dtype=np.int64):
"""Fast implementation of the sampling function

Get all samples from the next n items in a way that avoid rejection
sampling with too large samples, more precisely samples whose expected
number of sampled items is larger than 10**5.

Parameters
----------
n : int
the size of the chunk

Returns
-------
sample : numpy.array
the indices to keep given relative to the current position
in the sample or absolutely, depending on the value of
relative_indexing specified when initialising the sampler
(default value is True)

"""
self.type = dtype
position = self.initial_N - self.N
if n > self.N:
n = self.N

# expected number of sampled items
expected_k = n * self.K / np.float(self.N)
if expected_k > 10 ** 5:
sample = []
chunk_size = int(np.floor(10 ** 5 * self.N / np.float(self.K)))
i = 0
while n > 0:
amount = min(chunk_size, n)
sample.append(self.simple_sample(amount) + i * chunk_size)
n = n - amount
i += 1
sample = np.concatenate(sample)
else:
sample = self.simple_sample(n)

if not self.relative_indexing:
sample = sample + position

return sample

[docs]    def simple_sample(self, n):
"""get all samples from the next n items in a naive fashion

Parameters
----------
n : int
the size of the chunk
Returns
-------
sample : numpy.array
the indices to be kept relative to the current position
in the sample
"""
k = hypergeometric_sample(self.N, self.K, n)  # get the sample size
sample = sample_without_replacement(k, n, self.type)
self.N = self.N - n
self.K = self.K - k
return sample

# function np.random.hypergeometric is buggy so I did my own
# implementation...  (error, at least, line 784 in computation of
# variance: sample used instead of m, but this can't be all of it ?)
# following algo HRUA by Ernst Stadlober as implemented in numpy
# (https://github.com/numpy/numpy/blob/master/numpy/random/mtrand/
# distributions.c and see original ref in zotero)
# this is 100 to 200 times slower than np.random.hypergeometric, but
# it works reliably could be optimized a lot if needed (for small
# samples in particular but also generally)
# seems at worse to require comparable execution time when compared to the
# actual rejection sampling, so probably not going to be so bad all in all
[docs]def hypergeometric_sample(N, K, n):
"""This function return the number of elements to sample from the next n
items.
"""
# handling edge cases
if N == 0 or N == 1:
k = K
else:
# using symmetries to speed up computations
# if the probability of failure is smaller than the probability of
# success, draw the failure count
K_eff = min(K, N - K)
# if the amount of items to sample from is larger than the amount of
# items that will remain, draw from the items that will remain
n_eff = min(n, N - n)
N_float = np.float64(N)  # useful to avoid unexpected roundings

average = n_eff * (K_eff / N_float)
mode = np.floor((n_eff + 1) * ((K_eff + 1) / (N_float + 2)))
variance = average * ((N - K_eff) / N_float) * \
((N - n_eff) / (N_float - 1))
c1 = 2 * np.sqrt(2 / np.e)
c2 = 3 - 2 * np.sqrt(3 / np.e)
a = average + 0.5
b = c1 * np.sqrt(variance + 0.5) + c2
p_mode = (math.lgamma(mode + 1) + math.lgamma(K_eff - mode + 1) +
math.lgamma(n_eff - mode + 1) +
math.lgamma(N - K_eff - n_eff + mode + 1))
# 16 for 16-decimal-digit precision in c1 and c2 (?)
upper_bound = min(
min(n_eff, K_eff) + 1, np.floor(a + 16 * np.sqrt(variance + 0.5)))

while True:
U = np.random.rand()
V = np.random.rand()
k = np.int64(np.floor(a + b * (V - 0.5) / U))
if k < 0 or k >= upper_bound:
continue
else:
p_k = math.lgamma(k + 1) + math.lgamma(K_eff - k + 1) + \
math.lgamma(n_eff - k + 1) + \
math.lgamma(N - K_eff - n_eff + k + 1)
d = p_mode - p_k
if U * (4 - U) - 3 <= d:
break
if U * (U - d) >= 1:
continue
if 2 * np.log(U) <= d:
break

# retrieving original variables by symmetry
if K_eff < K:
k = n_eff - k
if n_eff < n:
k = K - k

return k

# returns uniform samples in [0, N-1] without replacement the values
# 0.6 and 100 are based on empirical tests of the functions and would
# need to be changed if the functions are changed
[docs]def sample_without_replacement(n, N, dtype=np.int64):
"""Returns uniform samples in [0, N-1] without replacement. It will use
Knuth sampling or rejection sampling depending on the parameters n and N.

.. note::

the values 0.6 and 100 are based on empirical tests of the
functions and would need to be changed if the functions are
changed

"""
if N > 100 and n / float(N) < 0.6:
sample = rejection_sampling(n, N, dtype)
else:
sample = Knuth_sampling(n, N, dtype)
return sample

# this one would benefit a lot from being cythonized, efficient if n
# close to N (np.random.choice with replace=False is cythonized and
# similar in spirit but not better because it shuffles the whole array
# of size N which is wasteful; once cythonized Knuth_sampling should
# be superior to it in all situation)
[docs]def Knuth_sampling(n, N, dtype=np.int64):
"""This is the usual sampling function when n is comparable to N"""
n = int(n)

t = 0  # total input records dealt with
m = 0  # number of items selected so far
sample = np.zeros(shape=n, dtype=dtype)
while m < n:
u = np.random.rand()
if (N - t) * u < n - m:
sample[m] = t
m = m + 1
t = t + 1
return sample

# maybe use array for the first iteration then use python native sets
# for faster set operations ?
[docs]def rejection_sampling(n, N, dtype=np.int64):
"""Using rejection sampling to keep a good performance if n << N"""
remaining = n
sample = np.array([], dtype=dtype)
while remaining > 0:
new_sample = np.random.randint(0, int(N), int(remaining)).astype(dtype)
# keeping only unique element:
sample = np.union1d(sample, np.unique(new_sample))
remaining = n - sample.shape[0]
return sample

# Profiling hypergeometric sampling + sampling without replacement together:

# ChunkSize
# 10**2:
#     hyper: 46s
#     sample: 32s
# 10**3:
#     hyper : 4.5s
#     sample 6s
# 10**4:
#     hyper 0.5s
#     sample 4.5s
# 10**5:
#     hyper 0.05s
#     sample 4s
# 10**6:
#     hyper 0.007s
#     sample 5.7s
# 10**7:
#     hyper 0.001s
#     sample 10.33s
# + memory increase with chunk size

# Should aim at having samples with around 100 000 elements.
# This means sampling in 10**5 * sampled_proportion chunks.

# profiling code:

# import time
# tt=[]
# ra = range(8, 9)
# for block_size in ra:
#     t = time.clock()
#     progress = 0
#     b = 10**block_size
#     for i in range(10**12//(10**3*b)):
#         r = s.sample(10**3*b)
#         progress = progress+100*(len(r)/10.**9)
#         print(progress)
#         if progress > 3:
#             break
#     tt.append(time.clock()-t)
# for e, b in zip(tt, ra):
#     print(b)
#     print(e)

# ## Profiling rejection sampling and Knuth sampling ###
# could create an automatic test for finding the turning point and offset
# between Knuth and rejection

# manual results:
# N	100
# n
# 1	R:60mu, K:30mu
# 10  R:83mu, K:54mu
# 100 R:7780mu, K:78mu
# N < 100 always Knuth
#
# N	1000
# n
# 1	R:60mu, K:248mu
# 10  R:65mu, K:450mu
# 100 R:150mu, K:523mu
# 1000 R:8610mu, K:785mu
# turning point: n/N between 0.5 and 0.75
#
# N 10**6
# 10**6 R:???, K:791ms
# 10**4 R:1ms, K:562ms
# 10**5 R:20ms, K:531ms
# turning point: n/N between 0.5 and 0.75
#
# N 10**9
# 10**6 R:174ms
# 10**7 R:2.7s
#
# N 10**18
# 1 R: 62mu
# 10 R: 62mu
# 10**3 R: 148mu
# 10**6 R: 131ms
```