# -*- coding: utf-8 -*-
"""
===============================================================================
Custom iterator functions (:mod:`sknano.core._itertools`)
===============================================================================
.. currentmodule:: sknano.core._itertools
"""
from __future__ import absolute_import, division, print_function, \
unicode_literals
__docformat__ = 'restructuredtext en'
from collections import Iterable
import collections
import operator
import random
from itertools import chain, combinations, count, cycle, filterfalse, \
islice, repeat, starmap, tee, zip_longest
__all__ = ['cyclic_pairs', 'take', 'tabulate', 'consume', 'nth', 'quantify',
'padnone', 'ncycles', 'dotproduct', 'flatten', 'repeatfunc',
'pairwise', 'grouper', 'roundrobin', 'partition', 'powerset',
'unique_elements', 'iter_except', 'first_true',
'random_product', 'random_permutation', 'random_combination',
'random_combination_with_replacement']
[docs]def cyclic_pairs(iterable):
"""
Generate a cyclic list of all subsequence pairs in `iterable`.
Returns
-------
:class:`~python:list`
Examples
--------
>>> cyclic_pairs('ABC')
[('A', 'B'), ('B', 'C'), ('C', 'A')]
"""
a, b = tee(iterable)
return list(zip(a, chain(b, [next(b)])))
[docs]def take(n, iterable):
"Return first n items of the iterable as a list"
return list(islice(iterable, n))
[docs]def tabulate(function, start=0):
"Return function(0), function(1), ..."
return map(function, count(start))
[docs]def consume(iterator, n):
"Advance the iterator n-steps ahead. If n is none, consume entirely."
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
[docs]def nth(iterable, n, default=None):
"Returns the nth item or a default value"
return next(islice(iterable, n, None), default)
[docs]def quantify(iterable, pred=bool):
"Count how many times the predicate is true"
return sum(map(pred, iterable))
[docs]def padnone(iterable):
"""Returns the sequence elements and then returns None indefinitely.
Useful for emulating the behavior of the built-in map() function.
"""
return chain(iterable, repeat(None))
[docs]def ncycles(iterable, n):
"Returns the sequence elements n times"
return chain.from_iterable(repeat(tuple(iterable), n))
[docs]def dotproduct(vec1, vec2):
return sum(map(operator.mul, vec1, vec2))
[docs]def flatten(items, ignore_types=(str, bytes)):
"Flatten nested sequence into a single list of values."
for x in items:
if isinstance(x, Iterable) and not isinstance(x, ignore_types):
yield from flatten(x)
else:
yield x
[docs]def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
Example: repeatfunc(random.random)
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
[docs]def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = tee(iterable)
next(b, None)
return zip(a, b)
[docs]def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
[docs]def roundrobin(*iterables):
"roundrobin('ABC', 'D', 'EF') --> A D E B F C"
# Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while pending:
try:
for next in nexts:
yield next()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
[docs]def partition(pred, iterable):
"Use a predicate to partition entries into false entries and true entries"
# partition(is_odd, range(10)) --> 0 2 4 6 8 and 1 3 5 7 9
t1, t2 = tee(iterable)
return filterfalse(pred, t1), filter(pred, t2)
[docs]def powerset(iterable):
"powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
[docs]def unique_elements(iterable, key=None):
"""Return generator of unique elements in `iterable`, preserving order.
Examples
--------
>>> list(unique_elements('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D']
>>> list(unique_elements('ABBCcAD'))
['A', 'B', 'C', 'c', 'D']
>>> list(unique_elements('ABBCcAD', str.lower))
['A', 'B', 'C', 'D']
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
[docs]def iter_except(func, exception, first=None):
"""Call a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like builtins.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
Parameters
----------
func : `callable`
exception : `Exception`
first : `callable`
for database APIs needing an initial cast to db.first()
Examples
--------
priority queue iterator:
>>> # iter_except(functools.partial(heappop, h), IndexError)
non-blocking dict iterator:
>>> # iter_except(d.popitem, KeyError)
non-blocking deque iterator:
>>> # iter_except(d.popleft, IndexError)
loop over a producer Queue:
>>> # iter_except(q.get_nowait, Queue.Empty)
non-blocking set iterator:
>>> # iter_except(s.pop, KeyError)
"""
try:
# For database APIs needing an initial cast to db.first()
if first is not None:
yield first()
while 1:
yield func()
except exception:
pass
[docs]def first_true(iterable, default=False, pred=None):
"""Returns the first true value in the iterable.
If no true value is found, returns *default*
If *pred* is not None, returns the first item
for which pred(item) is true.
Parameters
----------
default : `bool`
pred : {None, `callable`}, optional
Examples
--------
>>> # first_true([a,b,c], x)
>>> # a or b or c or x
>>> # first_true([a,b], x, f)
>>> # a if f(a) else b if f(b) else x
"""
return next(filter(pred, iterable), default)
[docs]def random_product(*args, repeat=1):
"""Random selection from :func:`~python:itertools.product`."""
pools = [tuple(pool) for pool in args] * repeat
return tuple(random.choice(pool) for pool in pools)
[docs]def random_permutation(iterable, r=None):
"""Random selection from :func:`~python:itertools.permutations`."""
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(random.sample(pool, r))
[docs]def random_combination(iterable, r):
"""Random selection from :func:`~python:itertools.combinations`."""
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.sample(range(n), r))
return tuple(pool[i] for i in indices)
[docs]def random_combination_with_replacement(iterable, r):
"""Random selection from \
:func:`~python:itertools.combinations_with_replacement`.
"""
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.randrange(n) for i in range(r))
return tuple(pool[i] for i in indices)