# -*- coding: utf-8 -*-
"""
===============================================================================
Custom iterator functions (:mod:`sknano.core.itertools`)
===============================================================================
.. currentmodule:: sknano.core.itertools
The functions defined in this module are from the
|itertools-recipes|_ for Python 3 or from the
Python Cookbook, Third Edition [PythonCookbook]_.
.. |itertools-recipes| replace:: Itertools Recipes
.. _itertools-recipes:
https://docs.python.org/3/library/itertools.html#itertools-recipes
.. [PythonCookbook] Python Cookbook, Third Edition
by David Beazley and Brian K. Jones.
"""
from __future__ import absolute_import, division, print_function, \
unicode_literals
__docformat__ = 'restructuredtext en'
from collections import Iterable
from operator import itemgetter
import operator
import collections
import random
from itertools import chain, combinations, count, cycle, filterfalse, \
groupby, islice, repeat, starmap, tee, zip_longest
__all__ = ['cyclic_pairs', 'dedupe', 'take', 'tabulate', 'tail', 'consume',
'nth', 'all_equal', 'quantify', 'padnone', 'ncycles', 'dotproduct',
'flatten', 'repeatfunc', 'pairwise', 'grouper', 'roundrobin',
'partition', 'powerset', 'unique_elements', 'unique_everseen',
'unique_justseen', 'iter_except', 'first_true', 'random_product',
'random_permutation', 'random_combination',
'random_combination_with_replacement']
[docs]def dedupe(items, key=None):
"""Remove duplicate values in a sequence, but preserve order of remaining \
items.
Parameters
----------
items : sequence
key : {None, function}, optional
function that converts sequence items into a hashable type for
the purposes of duplicate detection.
Returns
-------
items : set
Examples
--------
>>> a = [{'x': 1, 'y': 2}, {'x': 1, 'y': 3},
... {'x': 1, 'y': 2}, {'x': 2, 'y': 4}]
>>> list(dedupe(a, key=lambda d: (d['x'], d['y'])))
[{'x': 1, 'y': 2}, {'x': 1, 'y': 3}, {'x': 2, 'y': 4}]
>>> list(dedupe(a, key=lambda d: d['x']))
[{'x': 1, 'y': 2}, {'x': 2, 'y': 4}]
"""
seen = set()
for item in items:
val = item if key is None else key(item)
if val not in seen:
yield item
seen.add(val)
[docs]def cyclic_pairs(iterable):
"""Return iterator over all subsequence pairs in `iterable`, up to
last and first element pair.
Returns
-------
:class:`~python:collections.abc.Iterator`
Examples
--------
>>> list(cyclic_pairs('ABC'))
[('A', 'B'), ('B', 'C'), ('C', 'A')]
"""
a, b = tee(iterable)
return zip(a, chain(b, [next(b)]))
[docs]def take(n, iterable):
"""Return first n items of the iterable as a list.
Parameters
----------
n : :class:`~python:int`
iterable : :class:`collections.abc.Iterable`
Returns
-------
:class:`~python:list`
Examples
--------
>>> from sknano.core import take, tabulate
>>> t = tabulate(lambda i: i)
>>> take(5, t)
[0, 1, 2, 3, 4]
>>> take(5, t)
[5, 6, 7, 8, 9]
"""
return list(islice(iterable, n))
[docs]def tabulate(function, start=0):
"""Return an iterator mapping `function` over linear input.
Returns function(0), function(1), ...
Parameters
----------
function : callable
start : :class:`~python:int`
The start argument will be increased by 1 each time the iterator
is called and fed into `function`.
Returns
-------
:class:`~python:collections.abc.Iterator`
Examples
--------
>>> from sknano.core import tabulate, ordinal_form, take
>>> t = tabulate(ordinal_form, 10)
>>> take(5, t)
['10th', '11th', '12th', '13th', '14th']
>>> take(5, t)
['15th', '16th', '17th', '18th', '19th']
>>> t = tabulate(ordinal_form)
>>> take(5, t)
['0th', '1st', '2nd', '3rd', '4th']
>>> take(5, t)
['5th', '6th', '7th', '8th', '9th']
"""
return map(function, count(start))
[docs]def tail(n, iterable):
"""Return an iterator over the last `n` items.
Parameters
----------
n : :class:`~python:int`
iterable : :class:`~python:collections.abc.Iterable`
Returns
-------
:class:`~python:collections.abc.Iterator`.
"""
return iter(collections.deque(iterable, maxlen=n))
[docs]def consume(iterator, n=None):
"""Advance the iterator n-steps ahead. If n is `None`, consume entirely.
Parameters
----------
iterator : :class:`~python:collections.abc.Iterator`
n : {:class:`~python:None`, :class:`~python:int`}, optional
"""
if n is None:
# feed the entire iterator into a zero-length deque
collections.deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
[docs]def nth(iterable, n, default=None):
"""Returns the nth item or a default value.
Parameters
----------
iterable : :class:`~python:collections.abc.Iterable`
n : :class:`~python:int`
default : {:class:`~python:None`, :class:`~python:object`}, optional
Examples
--------
>>> from sknano.core import nth
>>> nth(range(10), 3)
3
>>> nth(range(10), 20, default='donkey')
'donkey'
"""
return next(islice(iterable, n, None), default)
[docs]def all_equal(iterable):
"""Returns True if all the elements are equal to each other.
Parameters
----------
iterable : :class:`~python:collections.abc.Iterable`
Returns
-------
:class:`~python:bool`
"""
g = groupby(iterable)
return next(g, True) and not next(g, False)
[docs]def quantify(iterable, pred=bool):
"""Count how many times the predicate is true.
Examples
--------
>>> from sknano.core import quantify
>>> quantify([True, False, True])
2
"""
return sum(map(pred, iterable))
[docs]def padnone(iterable):
"""Returns the sequence elements and then returns None indefinitely.
Examples
--------
>>> from sknano.core import padnone
>>> take(5, padnone(range(3)))
[0, 1, 2, None, None]
"""
return chain(iterable, repeat(None))
[docs]def ncycles(iterable, n):
"""Returns the sequence elements n times"""
return chain.from_iterable(repeat(tuple(iterable), n))
[docs]def dotproduct(vec1, vec2):
"""Returns the dot product of two \
:class:`~python:collections.abc.Iterable`\ s.
Parameters
----------
vec1, vec2 : :class:`~python:collections.abc.Iterable`
"""
return sum(map(operator.mul, vec1, vec2))
[docs]def flatten(items, ignore_types=(str, bytes)):
"""Yields single sequence of values with no nesting :ref:`[PythonCookbook]`
Parameters
----------
items : sequence
nested sequence that you want to flatten into a single sequence with
no nesting
ignore_types : :class:`~python:tuple`, optional
:class:`~python:tuple` of :class:`~python:collections.abc.Iterable`
:class:`~python:type`\ s to that should not be interpreted as
:class:`~python:collections.abc.Iterable`\ s to be flattened.
Default is (:class:`~python:str`, :class:`~python:bytes`) to
prevent strings and bytes from being interpreted as iterables
and expanded as individual characters.
Yields
------
sequence
single sequence with no nesting
Examples
--------
>>> items = [1, 2, [3, 4, [5, 6], 7], 8]
>>> from sknano.core import flatten
>>> items = list(flatten(items))
>>> print(items)
[1, 2, 3, 4, 5, 6, 7, 8]
"""
for x in items:
if isinstance(x, Iterable) and not isinstance(x, ignore_types):
yield from flatten(x)
else:
yield x
[docs]def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
[docs]def pairwise(iterable):
"""Returns an iterator of paired items, overlapping, from the original.
Examples
--------
>>> from sknano.core import pairwise, tabulate, take
>>> t = tabulate(lambda i: i)
>>> take(5, pairwise(t))
[(0, 1), (1, 2), (2, 3), (3, 4), (4, 5)]
>>> take(5, pairwise(t))
[(6, 7), (7, 8), (8, 9), (9, 10), (10, 11)]
"""
a, b = tee(iterable)
next(b, None)
return zip(a, b)
[docs]def grouper(iterable, n, fillvalue=None):
"""Collect data into fixed-length chunks or blocks.
Examples
--------
>>> from sknano.core import grouper
>>> list(grouper('ABCDEFG', 3, '?'))
[('A', 'B', 'C'), ('D', 'E', 'F'), ('G', '?', '?')]
"""
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
[docs]def roundrobin(*iterables):
"""Yields an item from each iterable, alternating between them.
Recipe credited to George Sakkis
Examples
--------
>>> from sknano.core import roundrobin
>>> list(roundrobin('ABC', 'D', 'EF'))
['A', 'D', 'E', 'B', 'F', 'C']
"""
pending = len(iterables)
nexts = cycle(iter(it).__next__ for it in iterables)
while pending:
try:
for next in nexts:
yield next()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
[docs]def partition(pred, iterable):
"""Use a predicate to partition entries into false entries and true \
entries.
Parameters
----------
pred : callable
callable function taking one argument and returning
:class:`~python:True` or :class:`~python:False`.
iterable : :class:`~python:collections.abc.Iterable`
Returns
-------
:class:`~python:tuple` of :class:`~pyton:collections.abc.Iterables`
Examples
--------
>>> from sknano.core import partition
>>> is_odd = lambda x: x % 2 != 0
>>> p = partition(is_odd, range(10))
>>> [print(i) for i in p]
[[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]
"""
t1, t2 = tee(iterable)
return filterfalse(pred, t1), filter(pred, t2)
[docs]def powerset(iterable):
"""Yields all possible subsets of the iterable.
Examples
--------
>>> from sknano.core import powerset
>>> list(powerset([1, 2, 3]))
[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)]
"""
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s) + 1))
[docs]def unique_everseen(iterable, key=None):
"""Yields unique elements in `iterable`, preserving order, remembering \
all elements ever seen.
Parameters
----------
iterable : :class:`~python:collections.abc.Iterable`
key : {None, callable}, optional
Yields
------
element in iterable
Examples
--------
>>> list(unique_everseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D']
>>> list(unique_everseen('ABBCcAD'))
['A', 'B', 'C', 'c', 'D']
>>> list(unique_everseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'D']
"""
seen = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
[docs]def unique_elements(iterable, key=None):
"""Alias for :func:`~sknano.core.unique_everseen`."""
return unique_everseen(iterable, key)
[docs]def unique_justseen(iterable, key=None):
"""Yields unique elements in `iterable`, preserving order, remembering \
only the element just seen.
Parameters
----------
iterable : :class:`~python:collections.abc.Iterable`
key : {None, callable}, optional
Yields
------
element in iterable
Examples
--------
>>> list(unique_justseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D', 'A', 'B']
>>> list(unique_justseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'A', 'D']
"""
return map(next, map(itemgetter(1), groupby(iterable, key)))
[docs]def iter_except(func, exception, first=None):
"""Call a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like builtins.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
Parameters
----------
func : `callable`
exception : :class:`~python:Exception`
first : `callable`
for database APIs needing an initial cast to db.first()
Examples
--------
>>> from sknano.core import iter_except
>>> list(iter_except(list(range(3)).pop, IndexError))
[2, 1, 0]
>>>
priority queue iterator:
>>> # iter_except(functools.partial(heappop, h), IndexError)
non-blocking dict iterator:
>>> # iter_except(d.popitem, KeyError)
non-blocking deque iterator:
>>> # iter_except(d.popleft, IndexError)
loop over a producer Queue:
>>> # iter_except(q.get_nowait, Queue.Empty)
non-blocking set iterator:
>>> # iter_except(s.pop, KeyError)
"""
try:
# For database APIs needing an initial cast to db.first()
if first is not None:
yield first()
while 1:
yield func()
except exception:
pass
[docs]def first_true(iterable, default=False, pred=None):
"""Returns the first true value in the iterable.
If no true value is found, returns *default*
If *pred* is not None, returns the first item
for which pred(item) is true.
Parameters
----------
default : `bool`
pred : {None, `callable`}, optional
Examples
--------
>>> from sknano.core import first_true
>>> first_true([(), None, '', 0, 1])
1
>>> first_true([(), None, '', 0], 'donkey')
'donkey'
>>> is_odd = lambda x: x % 2 != 0
>>> first_true([2, 4, 6, 8, 11], default='donkey', pred=is_odd)
11
>>> first_true([2, 4, 6, 8], default='donkey', pred=is_odd)
'donkey'
"""
return next(filter(pred, iterable), default)
[docs]def random_product(*args, repeat=1):
"""Random selection from :func:`~python:itertools.product`."""
pools = [tuple(pool) for pool in args] * repeat
return tuple(random.choice(pool) for pool in pools)
[docs]def random_permutation(iterable, r=None):
"""Random selection from :func:`~python:itertools.permutations`."""
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(random.sample(pool, r))
[docs]def random_combination(iterable, r):
"""Random selection from :func:`~python:itertools.combinations`."""
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.sample(range(n), r))
return tuple(pool[i] for i in indices)
[docs]def random_combination_with_replacement(iterable, r):
"""Random selection from \
:func:`~python:itertools.combinations_with_replacement`.
"""
pool = tuple(iterable)
n = len(pool)
indices = sorted(random.randrange(n) for i in range(r))
return tuple(pool[i] for i in indices)