Source code for pypicontents.core.utils

# -*- coding: utf-8 -*-
#
# Please refer to AUTHORS.rst for a complete list of Copyright holders.
# Copyright (C) 2016-2022, PyPIContents Developers.

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""
``pypicontents.core.utils`` is a utility module.

This module contains several utilities to process information coming from the
other modules.
"""

import os
import sys
import signal
import fnmatch
import pkgutil
from contextlib import contextmanager
from urllib.parse import urlparse, urlunparse, quote

from setuptools import find_packages

from .. import libdir

default_import_level = 0


def get_free_memory():
    with open('/proc/meminfo', 'r') as memory:
        free = 0
        for mem in memory:
            if str(mem.split()[0]) in ('MemFree:', 'Buffers:', 'Cached:'):
                free += int(mem.split()[1])
    return free * 1024


def get_children_processes(parent_pid):
    chfile = '/proc/{0}/task/{1}/children'.format(parent_pid, parent_pid)
    with open(chfile, 'r') as children:
        return children.read().strip('\n').strip().split()


class timeout(object):
    def __init__(self, sec=20, error='Operation timed out.'):
        self.sec = sec
        self.error = error

    def handle_timeout(self, signum, frame):
        raise RuntimeError(self.error)

    def __enter__(self):
        signal.signal(signal.SIGALRM, self.handle_timeout)
        signal.alarm(self.sec)

    def __exit__(self, type, value, traceback):
        signal.alarm(0)


def translate_letter_range(lr='0-z'):
    if '-' in lr:
        lr = [ord(lr.split('-')[0]), ord(lr.split('-')[1]) + 1]
        lr = [chr(i) for i in range(*lr) if 47 < i < 58 or 96 < i < 123]
    elif ',' in lr:
        lr = lr.lower().split(',')
    else:
        lr = [lr.lower()]
    return lr


def filter_package_list(pkglist, lr):
    return [p for lst in lr for p in pkglist if p[0].lower() == lst]


def create_file_if_notfound(filename):
    dedir = os.path.dirname(os.path.abspath(filename))
    if not os.path.isdir(dedir):
        os.makedirs(dedir)
    if not os.path.isfile(filename):
        with open(filename, 'w') as f:
            f.write('')
    return filename


def urlesc(url):
    parts = urlparse(url)
    return urlunparse(parts[:2] + (quote(parts[2]),) + parts[3:])


def get_tar_extension(path):
    extensions = []
    root, ext = os.path.splitext(path)

    while ext:
        extensions.append(ext)
        if ext in ['.tar', '.zip', '.tgz', '.whl', '.egg']:
            break
        root, ext = os.path.splitext(root)

    return ''.join(extensions[::-1])


[docs]def human2bytes(s): """ Attempts to guess the string format based on default symbols set and return the corresponding bytes as an integer. When unable to recognize the format ValueError is raised. >>> human2bytes('0 B') 0 >>> human2bytes('1 K') 1024 >>> human2bytes('1 M') 1048576 >>> human2bytes('1 Gi') 1073741824 >>> human2bytes('1 tera') 1099511627776 >>> human2bytes('0.5kilo') 512 >>> human2bytes('0.1 byte') 0 >>> human2bytes('1 k') # k is an alias for K 1024 >>> human2bytes('12 foo') Traceback (most recent call last): ... ValueError: can't interpret '12 foo' """ init = s num = "" SYMBOLS = { 'customary': ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y'), 'customary_ext': ('byte', 'kilo', 'mega', 'giga', 'tera', 'peta', 'exa', 'zetta', 'iotta'), 'iec': ('Bi', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi', 'Yi'), 'iec_ext': ('byte', 'kibi', 'mebi', 'gibi', 'tebi', 'pebi', 'exbi', 'zebi', 'yobi') } while s and s[0:1].isdigit() or s[0:1] == '.': num += s[0] s = s[1:] num = float(num) letter = s.strip() for name, sset in SYMBOLS.items(): if letter in sset: break else: if letter == 'k': # treat 'k' as an alias for 'K' as per: http://goo.gl/kTQMs sset = SYMBOLS['customary'] letter = letter.upper() else: raise ValueError("can't interpret %r" % init) prefix = {sset[0]: 1} for i, s in enumerate(sset[1:]): prefix[s] = 1 << (i + 1) * 10 return int(num * prefix[letter])
[docs]def u(u_string): """ Convert a string to unicode working on both python 2 and 3. :param u_string: a string to convert to unicode. .. versionadded:: 0.1.5 """ if isinstance(u_string, str): return u_string return u_string.decode('utf-8')
[docs]def s(s_string): """ Convert a byte stream to string working on both python 2 and 3. :param s_string: a byte stream to convert to string. .. versionadded:: 0.1.5 """ if isinstance(s_string, bytes): return s_string return s_string.encode('utf-8')
[docs]@contextmanager def custom_sys_path(new_sys_path): """ Context manager to momentarily change ``sys.path``. :param new_sys_path: a list of paths to overwrite ``sys.path``. .. versionadded:: 0.1.0 """ old_sys_path = sys.path sys.path = new_sys_path yield sys.path = old_sys_path
[docs]@contextmanager def remove_sys_modules(remove): """ Context manager to momentarily remove modules from ``sys.modules``. :param remove: a list of modules to remove from ``sys.modules``. .. versionadded:: 0.1.0 """ old_sys_modules = sys.modules for r in remove: if r in sys.modules: del sys.modules[r] yield sys.modules = old_sys_modules
[docs]def list_files(path=None, pattern='*'): """ List files on ``path`` (non-recursively). Locate all the files matching the supplied filename pattern in the first level of the supplied ``path``. If no pattern is supplied, all files will be returned. :param path: a string containing a path where the files will be looked for. :param pattern: a string containing a regular expression. :return: a list of files matching the pattern within the first level of path (non-recursive). .. versionadded:: 0.1.0 """ assert isinstance(path, str) assert isinstance(pattern, str) filelist = [] for f in fnmatch.filter(os.listdir(path), pattern): if os.path.isfile(os.path.join(path, f)): filelist.append(os.path.join(path, f)) return filelist
[docs]def find_files(path=None, pattern='*'): """ Locate all the files matching the supplied ``pattern`` in ``path``. Locate all the files matching the supplied filename pattern in and below the supplied root directory. If no pattern is supplied, all files will be returned. :param path: a string containing a path where the files will be looked for. :param pattern: a string containing a regular expression. :return: a list of files matching the pattern within path (recursive). .. versionadded:: 0.1 """ assert isinstance(path, str) assert isinstance(pattern, str) filelist = [] for directory, subdirs, files in os.walk(os.path.normpath(path)): for filename in fnmatch.filter(files, pattern): if os.path.isfile(os.path.join(directory, filename)): filelist.append(os.path.join(directory, filename)) return filelist
[docs]def is_valid_path(path): """ Test if ``path`` is a valid python path. :param path: a string containing a path. :return: ``True`` if ``path`` is a valid python path. ``False`` otherwise. .. versionadded:: 0.1.0 """ for component in os.path.normpath(path).split(os.sep): if ('.' in component or '-' in component) and \ component not in ['.', '..']: return False return True
[docs]def chunk_report(downloaded, total): """ Print the progress of a download. :param downloaded: an integer representing the size (in bytes) of data downloaded so far. :param total: an integer representing the total size (in bytes) of data that needs to be downloaded. .. versionadded:: 0.1.0 """ percent = round((float(downloaded) / total) * 100, 2) sys.stdout.write(('Downloaded {0:0.0f} of {1:0.0f} kB ' '({2:0.0f}%)\r').format(downloaded / 1024, total / 1024, percent)) if downloaded >= total: sys.stdout.write('\n\n')
[docs]def chunk_read(response, chunk_size=8192, report_hook=None): """ Download a file by chunks. :param response: a file object as returned by ``urlopen``. :param chunk_size: an integer representing the size of the chunks to be downloaded at a time. :param report_hook: a function to report the progress of the download. :return: a blob containing the downloaded file. .. versionadded:: 0.1.0 """ data = u('') downloaded = 0 total = int(response.info().get('Content-Length').strip()) while True: chunk = response.read(chunk_size) if not chunk: break data += u(chunk) downloaded += len(chunk) if report_hook: report_hook(downloaded, total) return data
[docs]def get_packages(path): """ List packages living in ``path`` with its directory. :param path: a path pointing to a directory containing python code. :return: a list of tuples containing the name of the package and the package directory. For example:: [ ('package_a', '/path/to/package_a'), ('package_b.module_b', '/path/to/package_b/module_b'), ('package_c.module_c', '/path/to/package_c/module_c') ] .. versionadded:: 0.1.0 """ packages = [] package_dirs = get_package_dirs(path) for _dir in package_dirs: for pkgname in find_packages(_dir): try: with custom_sys_path([_dir, libdir]): with remove_sys_modules([pkgname]): pkgdir = pkgutil.get_loader(pkgname).filename except Exception: pkgdir = os.path.join(_dir, os.sep.join(pkgname.split('.'))) packages.append([pkgname, pkgdir]) return packages
[docs]def get_modules(pkgdata): """ List modules inside packages provided in ``pkgdata``. :param pkgdata: a list of tuples containing the name of a package and the directory where its located. :return: a list of the modules according to the list of packages provided in ``pkgdata``. .. versionadded:: 0.1.0 """ modules = [] for pkgname, pkgdir in pkgdata: for py in list_files(pkgdir, '*.py'): module = os.path.splitext(os.path.basename(py))[0] if not module.startswith('__'): modname = '.'.join([pkgname, module]) else: modname = pkgname modules.append(modname) return sorted(list(set(modules)))
[docs]def get_package_dirs(path): """ List directories containing python packages on ``path``. :param path: a path pointing to a directory containing python code. :return: a list containing directories of packages. .. versionadded:: 0.1.0 """ package_dirs = [] for init in find_files(path, '__init__.py'): pkgdir = os.path.dirname(init) if os.path.commonprefix([pkgdir, path]) == path and \ is_valid_path(os.path.relpath(pkgdir, path)): while True: init = os.path.split(init)[0] if not os.path.isfile(os.path.join(init, '__init__.py')): break if init not in package_dirs: package_dirs.append(init) return package_dirs