# -*- coding: utf-8 -*- """ ################################################################################ # # # shijian # # # ################################################################################ # # # LICENCE INFORMATION # # # # This program provides change, time, file, list, statistics, language and # # other utilities. # # # # copyright (C) 2014 William Breaden Madden # # # # This software is released under the terms of the GNU General Public License # # version 3 (GPLv3). # # # # This program is free software: you can redistribute it and/or modify it # # under the terms of the GNU General Public License as published by the Free # # Software Foundation, either version 3 of the License, or (at your option) # # any later version. # # # # This program is distributed in the hope that it will be useful, but WITHOUT # # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or # # FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for # # more details. # # # # For a copy of the GNU General Public License, see # # . # # # ################################################################################ """ from __future__ import division import calendar import collections import datetime import functools import inspect import logging import math import os import pickle import random import re import sys import tempfile import time import unicodedata import uuid import warnings if sys.version_info[0] < 3: import subprocess32 as subprocess else: import subprocess import dateutil.relativedelta import matplotlib.pyplot as plt import numpy import pandas as pd import scipy.interpolate import scipy.io.wavfile from sklearn.preprocessing import MinMaxScaler import seaborn as sns import technicolor name = "shijian" __version__ = "2023-10-19T0215Z" log = logging.getLogger(name) log.addHandler(technicolor.ColorisingStreamHandler()) log.setLevel(logging.INFO) def _main(): global clocks clocks = Clocks() def time_UNIX( style = "UNIX time S" ): return style_datetime_object( datetime_object = datetime.datetime.utcnow(), style = style ) def time_UTC( style = None ): return style_datetime_object( datetime_object = datetime.datetime.utcnow(), style = style ) def filename_time_UNIX( style = "UNIX time S.SSSSSS", extension = None ): filename = str( time_UNIX( style = style ) ) if extension: filename = filename + extension filename_proposed = propose_filename( filename = filename ) return filename_proposed def filename_time_UTC( style = "YYYY-MM-DDTHHMMSSZ", extension = None ): filename = style_datetime_object( datetime_object = datetime.datetime.utcnow(), style = style ) if extension: filename = filename + extension filename_proposed = propose_filename( filename = filename ) return filename_proposed def style_minimal_seconds(seconds): time_intervals = ["days", "hours", "minutes", "seconds"] dateutil_object = dateutil.relativedelta.relativedelta(seconds = seconds) return " ".join("{} {}".format( int(getattr(dateutil_object, interval)), interval ) for interval in time_intervals if getattr(dateutil_object, interval)) def style_UNIX_timestamp( timestamp = None, style = "YYYY-MM-DDTHHMMZ" ): return style_datetime_object( datetime_object = datetime.datetime.utcfromtimestamp(timestamp), style = style ) def style_datetime_object( datetime_object = None, style = "YYYY-MM-DDTHHMMZ" ): if type(datetime_object) is datetime.datetime: # filename safe if style == "YYYY-MM-DDTHHMMZ": return datetime_object.strftime("%Y-%m-%dT%H%MZ") # filename safe with seconds elif style == "YYYY-MM-DDTHHMMSSZ": return datetime_object.strftime("%Y-%m-%dT%H%M%SZ") # filename safe with seconds and microseconds elif style == "YYYY-MM-DDTHHMMSSMMMMMMZ": return datetime_object.strftime("%Y-%m-%dT%H%M%S%fZ") # elegant elif style == "YYYY-MM-DD HH:MM:SS UTC": return datetime_object.strftime("%Y-%m-%d %H:%M:%S UTC") # elegant elif style == "YYYY-MM-DD HH:MM:SS Z": return datetime_object.strftime("%Y-%m-%d %H:%M:%S Z") # UNIX time in seconds with second fraction elif style == "UNIX time S.SSSSSS": return (datetime_object -\ datetime.datetime.utcfromtimestamp(0)).total_seconds() # UNIX time in seconds rounded elif style == "UNIX time S": return int((datetime_object -\ datetime.datetime.utcfromtimestamp(0)).total_seconds()) # human-readable date elif style == "day DD month YYYY": return datetime_object.strftime("%A %d %B %Y") # human-readable time and date elif style == "HH:MM day DD month YYYY": return datetime_object.strftime("%H:%M %A %d %B %Y") # human-readable time with seconds and date elif style == "HH:MM:SS day DD month YYYY": return datetime_object.strftime("%H:%M:%S %A %d %B %Y") # human-readable date with time with seconds elif style == "day DD month YYYY HH:MM:SS": return datetime_object.strftime("%A %d %B %Y %H:%M:%S") # human-readable-audible time with seconds and date elif style == "HH hours MM minutes SS sounds day DD month YYYY": return datetime_object.strftime("%H hours %M minutes %S seconds %A %d %B %Y") # human-readable days, hours and minutes elif style == "DD:HH:MM": return datetime_object.strftime("%d:%H:%M") # human-readable days, hours, minutes and seconds elif style == "DD:HH:MM:SS": return datetime_object.strftime("%d:%H:%M:%S") # human-readable time with seconds elif style == "HH:MM:SS": return datetime_object.strftime("%H:%M:%S") # human-readable-audible time with seconds elif style == "HH hours MM minutes SS seconds": return datetime_object.strftime("%H hours %M minutes %S seconds") # filename safe else: return datetime_object.strftime("%Y-%m-%dT%H%MZ") if type(datetime_object) is datetime.timedelta: if style == "YYYY-MM-DDTHHMMZ": style = "{DD} days, {HH}:{MM}:{SS}" if hasattr(datetime_object, "seconds"): seconds = datetime_object.seconds + datetime_object.days * 24 * 3600 else: seconds = int(datetime_object) seconds_total = seconds minutes = int(math.floor(seconds / 60)) minutes_total = minutes seconds -= minutes * 60 hours = int(math.floor(minutes / 60)) hours_total = hours minutes -= hours * 60 days = int(math.floor(hours / 24)) days_total = days hours -= days * 24 years = int(math.floor(days / 365)) years_total = years days -= years * 365 return style.format(**{ "Y" : years_total, "D" : days_total, "H" : hours_total, "M" : minutes_total, "S" : seconds_total, "YYYY": str(years).zfill(4), "DD" : str(days).zfill(2), "HH" : str(hours).zfill(2), "MM" : str(minutes).zfill(2), "SS" : str(seconds).zfill(2) }) def HHMM_to_minutes( HHMM # string "HHMM" ): hours, minutes = HHMM[:2], HHMM[2:] return 60 * int(hours) + int(minutes) def now_in_minutes(): now = datetime.datetime.utcnow() return 60 * now.hour + now.minute def in_daily_time_range( time_range = None, # string "HHMM--HHMM" e.g. "1700--1000" time_start = None, # string "HHMM" e.g. "1700" time_stop = None # string "HHMM" e.g. "1000" ): if time_range is None and time_start is None and time_stop is None: return None if time_range is not None: time_start = time_range.split("--")[0] time_stop = time_range.split("--")[1] now = now_in_minutes() time_start = HHMM_to_minutes(time_start) time_stop = HHMM_to_minutes(time_stop) minutes_per_day = 1440 return (now - time_start) % minutes_per_day <=\ (time_stop - time_start) % minutes_per_day def timer(function): @functools.wraps(function) def decoration( *args, **kwargs ): arguments = inspect.getcallargs(function, *args, **kwargs) clock = Clock(name = function.__name__) result = function(*args, **kwargs) clock.stop() return result return decoration class Clock(object): def __init__( self, name = None, start = True ): self._name = name self._start = start # Boolean start clock on instantiation self._start_time = None # internal (value to return) self._start_time_tmp = None # internal (value for calculations) self._stop_time = None # internal (value to return) self._update_time = None # internal # If no name is specified, generate a unique one. if self._name is None: self._name = UID() # If a global clock list is detected, add a clock instance to it. if "clocks" in globals(): clocks.add(self) self.reset() if self._start: self.start() def start(self): self._start_time_tmp = datetime.datetime.utcnow() self._start_time = datetime.datetime.utcnow() def stop(self): self.update() self._update_time = None self._start_time_tmp = None self._stop_time = datetime.datetime.utcnow() # Update the clock accumulator. def update(self): if self._update_time: self.accumulator += ( datetime.datetime.utcnow() - self._update_time ) else: self.accumulator += ( datetime.datetime.utcnow() - self._start_time_tmp ) self._update_time = datetime.datetime.utcnow() def reset(self): self.accumulator = datetime.timedelta(0) self._start_time_tmp = None # If the clock has a start time, add the difference between now and the # start time to the accumulator and return the accumulation. If the clock # does not have a start time, return the accumulation. def elapsed(self): if self._start_time_tmp: self.update() return self.accumulator def name(self): return self._name def time(self): return self.elapsed().total_seconds() def start_time(self): if self._start_time: return style_datetime_object(datetime_object = self._start_time) else: return "none" def stop_time(self): if self._stop_time: return style_datetime_object(datetime_object = self._stop_time) else: return "none" def report(self): string = "clock attribute".ljust(39) + "value" string += "\nname".ljust(40) + self.name() string += "\ntime start (s)".ljust(40) + self.start_time() string += "\ntime stop (s)".ljust(40) + self.stop_time() string += "\ntime elapsed (s)".ljust(40) + str(self.time()) string += "\n" return string def printout(self): print(self.report()) class Clocks(object): def __init__( self ): self._list_of_clocks = [] self._default_report_style = "statistics" def add( self, clock ): self._list_of_clocks.append(clock) def report( self, style = None ): if style is None: style = self._default_report_style if self._list_of_clocks != []: if style == "statistics": # Create a dictionary of clock types with corresponding lists of # times for all instances. dictionary_of_clock_types = {} # Get the names of all clocks and add them to the dictionary. for clock in self._list_of_clocks: dictionary_of_clock_types[clock.name()] = [] # Record the values of all clocks for their respective names in # the dictionary. for clock in self._list_of_clocks: dictionary_of_clock_types[clock.name()].append(clock.time()) # Create a report, calculating the average value for each clock # type. string = "clock type".ljust(39) + "mean time (s)" for name, values in list(dictionary_of_clock_types.items()): string += "\n" +\ str(name).ljust(39) + str(sum(values)/len(values)) string += "\n" elif style == "full": # Create a report, listing the values of all clocks. string = "clock".ljust(39) + "time (s)" for clock in self._list_of_clocks: string += "\n" +\ str(clock.name()).ljust(39) + str(clock.time()) string += "\n" else: string = "no clocks" return string def printout( self, style = None ): if style is None: style = self._default_report_style print(self.report(style = style)) class Progress(object): def __init__( self ): self.data = [] self.quick_calculation = False self.update_rate = 1 # s self.clock = Clock(name = "progress update clock") def engage_quick_calculation_mode( self ): self.quick_calculation = True def disengage_quick_calculation_mode( self ): self.quick_calculation = False def add_datum( self, fraction = None, style = None ): if len(self.data) == 0: self.data.append((fraction, time_UNIX())) elif self.quick_calculation is True: time_duration_since_last_update = self.clock.time() if time_duration_since_last_update >= self.update_rate: self.data.append((fraction, time_UNIX())) self.clock.reset() self.clock.start() else: self.data.append((fraction, time_UNIX())) return self.status(style = style) def estimated_time_of_completion( self ): if len(self.data) <= 1: return 0 else: try: model_values = model_linear( self.data, quick_calculation = self.quick_calculation ) b0 = model_values[0] b1 = model_values[1] x = 1 y = b0 + b1 * x except: y = 0 datetime_object = datetime.datetime.fromtimestamp(int(y)) return datetime_object # estimated time of arrival def ETA( self ): if len(self.data) <= 1: return style_datetime_object( datetime_object = datetime.datetime.now() ) else: return style_datetime_object( datetime_object = self.estimated_time_of_completion() ) # estimated time remaining def ETR( self ): if len(self.data) <= 1: return 0 else: delta_time = \ self.estimated_time_of_completion() - datetime.datetime.now() if delta_time.total_seconds() >= 0: return delta_time.total_seconds() else: return 0 def fraction( self ): return self.data[-1][0] def percentage( self ): return 100 * self.fraction() def status( self, style = None ): if style is None: message =\ "{percentage:.2f}% complete; " +\ "estimated completion time: {ETA} ({ETR:.2f} s)\r" return message.format( percentage = self.percentage(), ETA = self.ETA(), ETR = self.ETR() ) def UID(): return str(uuid.uuid4()) def unique_number( style = None ): # mode: integer 3 significant figures if style == "integer 3 significant figures": initial_number = 100 if "unique_numbers_3_significant_figures" not in globals(): global unique_numbers_3_significant_figures unique_numbers_3_significant_figures = [] if not unique_numbers_3_significant_figures: unique_numbers_3_significant_figures.append(initial_number) else: unique_numbers_3_significant_figures.append( unique_numbers_3_significant_figures[-1] + 1 ) if\ style == "integer 3 significant figures" and \ unique_numbers_3_significant_figures[-1] > 999: raise Exception return unique_numbers_3_significant_figures[-1] # mode: integer else: initial_number = 1 if "unique_numbers" not in globals(): global unique_numbers unique_numbers = [] if not unique_numbers: unique_numbers.append(initial_number) else: unique_numbers.append(unique_numbers[-1] + 1) return unique_numbers[-1] def unique_3_digit_number(): return unique_number(style = "integer 3 significant figures") ## @brief make text filename or URL safe def slugify( text = None, filename = True, URL = False, return_str = True ): if not sys.version_info >= (3, 0): text = unicode(text, "utf-8") text = unicodedata.normalize("NFKD", text) text = text.encode("ascii", "ignore") text = text.decode("utf-8") text = re.sub("[^\w\s-]", "", text) text = text.strip() if filename and not URL: text = re.sub("[\s]+", "_", text) elif URL: text = text.lower() text = re.sub("[-\s]+", "-", text) if return_str: text = str(text) return text ## @brief propose a filename # @detail This function returns a filename string. If a default filename is not # specified, the function generates one based on the current time. If a default # filename is specified, the function uses it as the default filename. By # default, the function then checks to see if using the filename would cause # overwriting of an existing file. If overwriting is possible, the function # appends an integer to the filename in a loop in order to generate a filename # that would not cause overwriting of an existing file. The function can be set # to overwrite instead of using the default overwrite protection behaviour. # @return filename string def propose_filename( filename = None, overwrite = False, slugify_filename = True, exclude_extension_from_slugify = True ): # If no file name is specified, generate one. if not filename: filename = time_UTC() filename_proposed = filename if slugify_filename: if exclude_extension_from_slugify: filename_base = os.path.splitext(os.path.basename(filename))[0] filename_extension = os.path.splitext(os.path.basename(filename))[1] filename_base = slugify(text = filename_base) filename_proposed = filename_base + filename_extension else: filename_proposed = slugify(text = filename) if not overwrite: count = 0 while os.path.exists(filename_proposed): count = count + 1 filename_directory = os.path.dirname(filename) filename_base = os.path.splitext(os.path.basename(filename))[0] filename_extension = os.path.splitext(os.path.basename(filename))[1] if filename_directory: filename_proposed = filename_directory + \ "/" + \ filename_base + \ "_" + \ str(count) + \ filename_extension else: filename_proposed = filename_base + \ "_" + \ str(count) + \ filename_extension return filename_proposed def tmp_filepath(): """ Return an extensionless filepath at the directory /tmp without creating a file at the filepath. """ return "/tmp/" + next(tempfile._get_candidate_names()) def tail( filepath = "log.txt", lines = 50 ): """ Return a specified number of last lines of a specified file. If there is an error or the file does not exist, return False. """ try: filepath = os.path.expanduser(os.path.expandvars(filepath)) if os.path.isfile(filepath): text = subprocess.check_output(["tail", "-" + str(lines), filepath]) if text: return text else: return False else: return False except: return False def ensure_platform_release( keyphrase = "el7", require = True, warn = False ): import platform release = platform.release() if keyphrase not in release: message =\ "inappropriate environment: " +\ "\"{keyphrase}\" required; \"{release}\" available".format( keyphrase = keyphrase, release = release ) if warn is True: log.warning(message) if require is True: log.fatal(message) raise(EnvironmentError) def ensure_program_available( program ): log.debug("ensure program {program} available".format( program = program )) if which(program) is None: log.error("program {program} not available".format( program = program )) raise(EnvironmentError) else: log.debug("program {program} available".format( program = program )) def which( program ): def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return(program) else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None def running( program ): program = str.encode(program) results = subprocess.Popen( ["ps", "-A"], stdout = subprocess.PIPE ).communicate()[0].split(b"\n") matches_current = [ line for line in results if program in line and b"defunct" not in line ] if matches_current: return True else: return False def ensure_file_existence( filename ): log.debug("ensure existence of file {filename}".format( filename = filename )) if not os.path.isfile(os.path.expandvars(filename)): log.error("file {filename} does not exist".format( filename = filename )) raise(IOError) else: log.debug("file {filename} found".format( filename = filename )) def rm_file(filename): os.remove(filename) ## @brief return a naturally-sorted list of filenames that are in a sequence or ## a dictionary of lists of filenames that are in a sequence def find_file_sequences( extension = "png", directory = ".", return_first_sequence_only = True, ): filenames_of_directory = os.listdir(directory) filenames_found = [ filename for filename in filenames_of_directory if re.match( r".*\d+.*\." + extension, filename ) ] filename_sequences = collections.defaultdict(list) for filename in filenames_found: pattern = re.sub("\d+", "XXX", filename) filename_sequences[pattern].append(filename) if return_first_sequence_only is True: first_key_identified = next(iter(filename_sequences.keys())) filename_sequence = \ natural_sort(filename_sequences[first_key_identified]) return filename_sequence else: return filename_sequences ## @brief return a list of files at a specified directory def ls_files( directory = "." ): return([filename for filename in os.listdir(directory) if os.path.isfile( os.path.join(directory, filename) )]) ## @brief return a list of files, directories and subdirectories at a specified ## directory def directory_listing( directory = ".", ): files_list = [] for root, directories, filenames in os.walk(directory): for filename in filenames: files_list.append(os.path.join(root, filename)) return files_list ## @brief return a list of filepaths at a directory, optionally filtered to ## contain a specified extension def filepaths_at_directory( directory = None, extension_required = None ): if not os.path.isdir(directory): log.error("error -- directory {directory} not found".format(directory = directory)) raise(IOError) filepaths = [os.path.abspath(os.path.join(directory, filename)) for filename in os.listdir(directory) if os.path.isfile(os.path.join(directory, filename))] if extension_required: filepaths = [filepath for filepath in filepaths if extension_required in os.path.splitext(filepath)[1]] return filepaths def engage_command( command = None, background = True, timeout = None ): log.debug(command) if background: if timeout: log.warning("warning -- command set to run in background; ignoring timeout") subprocess.Popen( [command], shell = True, executable = "/bin/bash" ) return None elif not background: process = subprocess.Popen( [command], shell = True, executable = "/bin/bash", stdout = subprocess.PIPE ) try: process.wait(timeout = timeout) output, errors = process.communicate(timeout = timeout) return output except: process.kill() return False else: return None def percentage_power(): try: filenames_power = engage_command(command = "upower -e") filenames_power = [line for line in filenames_power.split("\n") if line] filenames_power_battery = [filename for filename in filenames_power if "battery" in filename] filename_power_battery = filenames_power_battery[0] if filenames_power_battery else None filenames_power_line = [filename for filename in filenames_power if "line" in filename] filename_power_line = filenames_power_line[0] if filenames_power_line else None if filename_power_battery: power_data = engage_command(command = "upower -i {filename}".format(filename = filename_power_battery)) percentage_power = [line for line in power_data.split("\n") if "percentage" in line][0].split()[1] elif filename_power_line: percentage_power = "100%" else: percentage_power = None return percentage_power except: return None def convert_type_list_elements( list_object = None, element_type = str ): """ Recursively convert all elements and all elements of all sublists of a list to a specified type and return the new list. """ if element_type is str: return [str(element) if not isinstance(element, list) else convert_type_list_elements( list_object = element, element_type = str ) for element in list_object] class List_Consensus(list): """ This class is designed to instantiate a list of elements. It features functionality that limits approximately the memory usage of the list. On estimating the size of the list as greater than the specified or default size limit, the list reduces the number of elements it contains. The list provides functionality to return its most frequent element, which can be used to determine its "consensus" element. """ def __init__( self, *args ): # list initialisation if sys.version_info >= (3, 0): super().__init__(*args) else: super(List_Consensus, self).__init__(*args) self.size_constraint = 150 # bytes def set_size_constraint( self, size = None ): if size is not None: self.size_constraint = size def ensure_size( self, size = None ): """ This function removes the least frequent elements until the size constraint is met. """ if size is None: size = self.size_constraint while sys.getsizeof(self) > size: element_frequencies = collections.Counter(self) infrequent_element = element_frequencies.most_common()[-1:][0][0] self.remove(infrequent_element) def append( self, element, ensure_size = True, size = None ): if size is None: size = self.size_constraint list.append(self, element) if ensure_size: self.ensure_size( size = size ) def consensus( self ): try: element_frequencies = collections.Counter(self) return element_frequencies.most_common(1)[0][0] except: return None ## @brief return a naturally-sorted list # @detail This function returns a naturally-sorted list from an input list. def natural_sort( list_object ): convert = lambda text: int(text) if text.isdigit() else text.lower() alphanumeric_key = lambda key: [ convert(text) for text in re.split("([0-9]+)", key) ] return sorted(list_object, key = alphanumeric_key) def indices_of_list_element_duplicates( x ): seen = set() for index, element in enumerate(x): if isinstance(element, list): element = tuple(element) if isinstance(element, dict): element = tuple(element.items()) if element not in seen: seen.add(element) else: yield index def indices_of_greatest_values( x, number = 5 ): if len(x) <= number: number = len(x) return [y[0] for y in sorted(enumerate(x), key = lambda y: y[1])[-number:]] def unique_list_elements(x): unique_elements = [] for element in x: if element not in unique_elements: unique_elements.append(element) return unique_elements def select_spread( list_of_elements = None, number_of_elements = None ): """ This function returns the specified number of elements of a list spread approximately evenly. """ if len(list_of_elements) <= number_of_elements: return list_of_elements if number_of_elements == 0: return [] if number_of_elements == 1: return [list_of_elements[int(round((len(list_of_elements) - 1) / 2))]] return \ [list_of_elements[int(round((len(list_of_elements) - 1) /\ (2 * number_of_elements)))]] +\ select_spread(list_of_elements[int(round((len(list_of_elements) - 1) /\ (number_of_elements))):], number_of_elements - 1) def split_list( list_object = None, granularity = None ): """ This function splits a list into a specified number of lists. It returns a list of lists that correspond to these parts. Negative numbers of parts are not accepted and numbers of parts greater than the number of elements in the list result in the maximum possible number of lists being returned. """ if granularity < 0: raise Exception("negative granularity") mean_length = len(list_object) / float(granularity) split_list_object = [] last_length = float(0) if len(list_object) > granularity: while last_length < len(list_object): split_list_object.append( list_object[int(last_length):int(last_length + mean_length)] ) last_length += mean_length else: split_list_object = [[element] for element in list_object] return split_list_object def ranges_edge_pairs( extent = None, range_length = None ): """ Return the edges of ranges within an extent of some length. For example, to separate 76 variables into groups of at most 20 variables, the ranges of the variables could be 0 to 20, 21 to 41, 42 to 62 and 63 to 76. These range edges could be returned by this function as a list of tuples: >>> ranges_edge_pairs( ... extent = 76, # number of variables ... range_length = 20 # maximum number of variables per plot ... ) [(0, 20), (21, 41), (42, 62), (63, 76)] """ number_of_ranges = int(math.ceil(extent / range_length)) return [ ( index * range_length + index, min((index + 1) * range_length + index, extent) ) for index in range(0, number_of_ranges) ] def Markdown_list_to_dictionary( Markdown_list = None ): line = re.compile(r"( *)- ([^:\n]+)(?:: ([^\n]*))?\n?") depth = 0 stack = [{}] for indent, name, value in line.findall(Markdown_list): indent = len(indent) if indent > depth: assert not stack[-1], "unexpected indent" elif indent < depth: stack.pop() stack[-1][name] = value or {} if not value: # new branch stack.append(stack[-1][name]) depth = indent return stack[0] def Markdown_list_to_OrderedDict( Markdown_list = None ): line = re.compile(r"( *)- ([^:\n]+)(?:: ([^\n]*))?\n?") depth = 0 stack = [collections.OrderedDict()] for indent, name, value in line.findall(Markdown_list): indent = len(indent) if indent > depth: assert not stack[-1], "unexpected indent" elif indent < depth: stack.pop() stack[-1][name] = value or collections.OrderedDict() if not value: # new branch stack.append(stack[-1][name]) depth = indent return stack[0] def open_configuration( filename = None ): file_configuration = open(filename, "r").read() return Markdown_list_to_OrderedDict(file_configuration) def change_list_resolution( values = None, length = None, interpolation_type = "linear", dimensions = 1 ): y1 = values x1 = list(range(0, len(values))) interpolation = scipy.interpolate.interp1d( x1, y1, kind = interpolation_type ) x2 = list(numpy.linspace(min(x1), max(x1), length)) y2 = [float(interpolation(x)) for x in x2] if dimensions == 1: return y2 elif dimensions == 2: return (x2, y2) def change_waveform_to_rectangle_waveform( values = None, fraction_amplitude = 0.01 ): values[values >= 0] = fraction_amplitude * max(values) values[values < 0] = fraction_amplitude * min(values) values[:] = [x * (1 / fraction_amplitude) for x in values] return values def change_sound_file_waveform_to_sound_file_rectangle_waveform( filename_waveform = None, filename_rectangle_waveform = None, overwrite = False, fraction_amplitude = 0.01 ): if filename_rectangle_waveform is None: filename_rectangle_waveform = filename_waveform filename_rectangle_waveform = propose_filename( filename = filename_rectangle_waveform, overwrite = overwrite ) rate, values = scipy.io.wavfile.read(filename_waveform) values = change_waveform_to_rectangle_waveform( values = values, fraction_amplitude = fraction_amplitude ) values[values >= 0] = fraction_amplitude * max(values) values[values < 0] = fraction_amplitude * min(values) values[:] = [x * (1 / fraction_amplitude) for x in values] scipy.io.wavfile.write(filename_rectangle_waveform, rate, values) def normalize( x, summation = None ): if summation is None: summation = sum(x) # normalize to unity return [element/summation for element in x] def rescale( x, minimum = 0, maximum = 1 ): return [ minimum + (element - min(x)) * ((maximum - minimum)\ / (max(x) - min(x))) for element in x ] def composite_variable( x ): k = len(x) + 1 variable = 0 for index, element in enumerate(x): variable += k**(index - 1) * element return variable def model_linear( data = None, quick_calculation = False ): if quick_calculation is True: data = select_spread(data, 10) n = len(data) x_values = [] y_values = [] x_squared_values = [] xy_values = [] for datum in data: x = datum[0] y = datum[1] x_values.append(x) y_values.append(y) x_squared_values.append(x ** 2) xy_values.append(x * y) b1 = (sum(xy_values) - (sum(x_values) * sum(y_values)) / n) / \ (sum(x_squared_values) - (sum(x_values) ** 2) / n) b0 = (sum(y_values) - b1 * sum(x_values)) / n return (b0, b1) def import_object( filename = None ): return pickle.load(open(filename, "rb")) def export_object( x, filename = None, overwrite = False ): filename = propose_filename( filename = filename, overwrite = overwrite ) pickle.dump(x, open(filename, "wb")) def string_to_bool(x): return x.lower() in ("yes", "true", "t", "1") def ustr(text): """ Convert a string to Python 2 unicode or Python 3 string as appropriate to the version of Python in use. """ if text is not None: if sys.version_info >= (3, 0): return str(text) else: return unicode(text) else: return text def number_to_English_text( number = None ): ones = [ "", "one ", "two ", "three ", "four ", "five ", "six ", "seven ", "eight ", "nine " ] teens = [ "ten ", "eleven ", "twelve ", "thirteen ", "fourteen ", "fifteen ", "sixteen ", "seventeen ", "eighteen ", "nineteen " ] tens = [ "", "", "twenty ", "thirty ", "forty ", "fifty ", "sixty ", "seventy ", "eighty ", "ninety " ] thousands = [ "", "thousand ", "million ", "billion ", "trillion ", "quadrillion ", "quintillion ", "sextillion ", "septillion ", "octillion ", "nonillion ", "decillion ", "undecillion ", "duodecillion ", "tredecillion ", "quattuordecillion ", "quindecillion", "sexdecillion ", "septendecillion ", "octodecillion ", "novemdecillion ", "vigintillion " ] # Split the number into 3-digit groups with each group representing # hundreds, thousands etc. number_in_groups_of_3 = [] number_as_string = str(number) for position in range(3, 33, 3): progressive_number_string = number_as_string[-position:] progression = len(number_as_string) - position # Break if the end of the number string is encountered. if progression < -2: break else: if progression >= 0: number_in_groups_of_3.append(int(progressive_number_string[:3])) elif progression >= -1: number_in_groups_of_3.append(int(progressive_number_string[:2])) elif progression >= -2: number_in_groups_of_3.append(int(progressive_number_string[:1])) # Split the number 3-digit groups into groups of ones, tens etc. and build # an English text representation of the number. number_words = "" for index, group in enumerate(number_in_groups_of_3): number_1 = group % 10 number_2 = (group % 100) // 10 number_3 = (group % 1000) // 100 if group == 0: continue else: thousand = thousands[index] if number_2 == 0: number_words = ones[number_1] + thousand + number_words elif number_2 == 1: number_words = teens[number_1] + thousand + number_words elif number_2 > 1: number_words = tens[number_2] + ones[number_1] + thousand + number_words if number_3 > 0: number_words = ones[number_3] + "hundred " + number_words return number_words.strip(" ") def replace_numbers_in_text_with_English_text( text = None ): # Split the text into text and numbers. text = re.split("(\d+)", text) if text[-1] == "": text = text[:-1] text_translated = [] # Replace numbers with English text. for text_segment in text: if all(character.isdigit() for character in text_segment): text_translated.append(number_to_English_text(number = text_segment)) else: text_translated.append(text_segment) return "".join(text_translated) def replace_contractions_with_full_words_and_replace_numbers_with_digits( text = None, remove_articles = True ): """ This function replaces contractions with full words and replaces numbers with digits in specified text. There is the option to remove articles. """ words = text.split() text_translated = "" for word in words: if remove_articles and word in ["a", "an", "the"]: continue contractions_expansions = { "ain't": "is not", "aren't": "are not", "can't": "can not", "could've": "could have", "couldn't": "could not", "didn't": "did not", "doesn't": "does not", "don't": "do not", "gonna": "going to", "gotta": "got to", "hadn't": "had not", "hasn't": "has not", "haven't": "have not", "he'd": "he would", "he'll": "he will", "he's": "he is", "how'd": "how did", "how'll": "how will", "how's": "how is", "I'd": "I would", "I'll": "I will", "I'm": "I am", "I've": "I have", "isn't": "is not", "it'd": "it would", "it'll": "it will", "it's": "it is", "mightn't": "might not", "might've": "might have", "mustn't": "must not", "must've": "must have", "needn't": "need not", "oughtn't": "ought not", "shan't": "shall not", "she'd": "she would", "she'll": "she will", "she's": "she is", "shouldn't": "should not", "should've": "should have", "somebody's": "somebody is", "someone'd": "someone would", "someone'll": "someone will", "someone's": "someone is", "that'll": "that will", "that's": "that is", "that'd": "that would", "there'd": "there would", "there're": "there are", "there's": "there is", "they'd": "they would", "they'll": "they will", "they're": "they are", "they've": "they have", "wasn't": "was not", "we'd": "we would", "we'll": "we will", "we're": "we are", "we've": "we have", "weren't": "were not", "what'd": "what did", "what'll": "what will", "what're": "what are", "what's": "what is", "whats": "what is", "what've": "what have", "when's": "when is", "when'd": "when did", "where'd": "where did", "where's": "where is", "where've": "where have", "who'd": "who would", "who'd've": "who would have", "who'll": "who will", "who're": "who are", "who's": "who is", "who've": "who have", "why'd": "why did", "why're": "why are", "why's": "why is", "won't": "will not", "won't've": "will not have", "would've": "would have", "wouldn't": "would not", "wouldn't've": "would not have", "y'all": "you all", "ya'll": "you all", "you'd": "you would", "you'd've": "you would have", "you'll": "you will", "y'aint": "you are not", "y'ain't": "you are not", "you're": "you are", "you've": "you have" } if word in list(contractions_expansions.keys()): word = contractions_expansions[word] numbers_digits = { "zero": "0", "one": "1", "two": "2", "three": "3", "four": "4", "five": "5", "six": "6", "seven": "7", "eight": "8", "nine": "9", "ten": "10", "eleven": "11", "twelve": "12", "thirteen": "13", "fourteen": "14", "fifteen": "15", "sixteen": "16", "seventeen": "17", "eighteen": "18", "nineteen": "19", "twenty": "20" } if word in list(numbers_digits.keys()): word = numbers_digits[word] text_translated += " " + word text_translated = text_translated.strip() return text_translated def split_into_sentences( text = None ): capitals = "([A-Z])" prefixes = "(Dr|dr|Hon|hon|Mr|mr|Mrs|mrs|Ms|ms|St|st)[.]" suffixes = "(Co|co|Inc|inc|Jr|jr|Ltd|ltd|Sr|sr)" starters = "(But\s|Dr|He\s|However\s|It\s|Mr|Mrs|Ms|Our\s|She\s|That\s|Their\s|They\s|This\s|We\s|Wherever)" acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)" websites = "[.](com|gov|io|net|org|pro)" text = " " + text + " " text = text.replace("\n", " ") text = re.sub(prefixes, "\\1", text) text = re.sub(websites, "\\1", text) if "Ph.D" in text: text = text.replace("Ph.D.", "PhD") text = re.sub("\s" + capitals + "[.] ", " \\1 ", text) text = re.sub(acronyms + " " + starters, "\\1 \\2", text) text = re.sub(capitals + "[.]" + capitals + "[.]" + capitals + "[.]","\\1\\2\\3", text) text = re.sub(capitals + "[.]" + capitals + "[.]", "\\1\\2", text) text = re.sub(" " + suffixes + "[.] " + starters, " \\1 \\2", text) text = re.sub(" " + suffixes + "[.]", " \\1", text) text = re.sub(" " + capitals + "[.]", " \\1", text) if "”" in text: text = text.replace(".”", "”.") if "\"" in text: text = text.replace(".\"", "\".") if "!" in text: text = text.replace("!\"", "\"!") if "?" in text: text = text.replace("?\"", "\"?") text = text.replace(".", ".") text = text.replace("?", "?") text = text.replace("!", "!") text = text.replace("", ".") sentences = text.split("") sentences = sentences[:-1] sentences = [sentence.strip() for sentence in sentences] return sentences def trim_incomplete_sentences( text = None ): return " ".join(split_into_sentences(text)[1:]) def pseudorandom_MAC_address(): return "{aa:02x}:{bb:02x}:{cc:02x}:{dd:02x}:{ee:02x}:{ff:02x}".format( aa = random.randint(0, 255), bb = random.randint(0, 255), cc = random.randint(0, 255), dd = random.randint(0, 255), ee = random.randint(0, 255), ff = random.randint(0, 255) ) def get_attribute( object_instance = None, name = None, imputation_default_value = None ): try: if "[" in name and "]" in name: index = int(name.split("[")[1].split("]")[0]) attribute = name.split("[")[0] value = getattr(object_instance, attribute)[index] else: value = getattr(object_instance, name) except: value = imputation_default_value return value def generate_Python_variable_names( number = 10 ): names = [] while len(names) < number: name = str(uuid.uuid4()).replace("-", "") if name[0].isalpha(): names.append(name) return names def add_time_variables(df, reindex = True): """ Return a DataFrame with variables for weekday index, weekday name, timedelta through day, fraction through day, hours through day and days through week added, optionally with the index set to datetime and the variable `datetime` removed. It is assumed that the variable `datetime` exists. """ if not "datetime" in df.columns: log.error("field datetime not found in DataFrame") return False df["datetime"] = pd.to_datetime(df["datetime"]) df["month"] = df["datetime"].dt.month df["month_name"] = df["datetime"].dt.strftime("%B") df["weekday"] = df["datetime"].dt.weekday df["weekday_name"] = df["datetime"].dt.weekday_name df["time_through_day"] = df["datetime"].map( lambda x: x - datetime.datetime.combine( x.date(), datetime.time() ) ) df["fraction_through_day"] = df["time_through_day"].map( lambda x: x / datetime.timedelta(hours = 24) ) df["hour"] = df["datetime"].dt.hour df["hours_through_day"] = df["fraction_through_day"] * 24 df["days_through_week"] = df.apply( lambda row: row["weekday"] + row["fraction_through_day"], axis = 1 ) df["days_through_year"] = df["datetime"].dt.dayofyear df.index = df["datetime"] #del df["datetime"] return df def daily_plots( df, variable, renormalize = True, plot = True, scatter = False, linestyle = "-", linewidth = 1, s = 1 ): """ Create daily plots of a variable in a DataFrame, optionally renormalized. It is assumed that the DataFrame index is datetime. """ if not df.index.dtype in ["datetime64[ns]", "M8[ns]"]: log.error("index is not datetime") return False days = [] for group in df.groupby(df.index.day): days.append(group[1]) scaler = MinMaxScaler() plt.xlabel("hours") plt.ylabel(variable); for day in days: if renormalize: values = scaler.fit_transform(day[[variable]]) else: values = day[variable] if plot: plt.plot(day["hours_through_day"], values, linestyle = linestyle, linewidth = linewidth) if scatter: plt.scatter(day["hours_through_day"], values, s = s) def weekly_plots( df, variable, renormalize = True, plot = True, scatter = False, linestyle = "-", linewidth = 1, s = 1 ): """ Create weekly plots of a variable in a DataFrame, optionally renormalized. It is assumed that the variable `days_through_week` exists. """ if not "days_through_week" in df.columns: log.error("field days_through_week not found in DataFrame") return False weeks = [] for group in df.groupby(df.index.week): weeks.append(group[1]) scaler = MinMaxScaler() plt.ylabel(variable); for week in weeks: if renormalize: values = scaler.fit_transform(week[[variable]]) else: values = week[variable] if plot: plt.plot(week["days_through_week"], values, linestyle = linestyle, linewidth = linewidth) if scatter: plt.scatter(week["days_through_week"], values, s = s) plt.xticks( [ 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5], ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"] ) def yearly_plots( df, variable, renormalize = True, horizontal_axis_labels_days = False, horizontal_axis_labels_months = True, plot = True, scatter = False, linestyle = "-", linewidth = 1, s = 1 ): """ Create yearly plots of a variable in a DataFrame, optionally renormalized. It is assumed that the DataFrame index is datetime. """ if not df.index.dtype in ["datetime64[ns]", "M8[ns]"]: log.error("index is not datetime") return False years = [] for group in df.groupby(df.index.year): years.append(group[1]) scaler = MinMaxScaler() plt.xlabel("days") plt.ylabel(variable); for year in years: if renormalize: values = scaler.fit_transform(year[[variable]]) else: values = year[variable] if plot: plt.plot(year["days_through_year"], values, linestyle = linestyle, linewidth = linewidth, label = year.index.year.values[0]) if scatter: plt.scatter(year["days_through_year"], values, s = s) if horizontal_axis_labels_months: plt.xticks( [ 15.5, 45, 74.5, 105, 135.5, 166, 196.5, 227.5, 258, 288.5, 319, 349.5], ["January", "February", "March", "April", "May", "June", "July", "August", "September", "October", "November", "December"] ) plt.legend() def add_rolling_statistics_variables( df = None, variable = None, window = 20, upper_factor = 2, lower_factor = 2 ): """ Add rolling statistics variables derived from a specified variable in a DataFrame. """ df[variable + "_rolling_mean"] = pd.stats.moments.rolling_mean(df[variable], window) df[variable + "_rolling_standard_deviation"] = pd.stats.moments.rolling_std(df[variable], window) df[variable + "_rolling_upper_bound"] = df[variable + "_rolling_mean"] + upper_factor * df[variable + "_rolling_standard_deviation"] df[variable + "_rolling_lower_bound"] = df[variable + "_rolling_mean"] - lower_factor * df[variable + "_rolling_standard_deviation"] return df def rescale_variables( df, variables_include = [], variables_exclude = [] ): """ Rescale variables in a DataFrame, excluding variables with NaNs and strings, excluding specified variables, and including specified variables. """ variables_not_rescale = variables_exclude variables_not_rescale.extend(df.columns[df.isna().any()].tolist()) # variables with NaNs variables_not_rescale.extend(df.select_dtypes(include = ["object", "datetime", "timedelta"]).columns) # variables with strings variables_rescale = list(set(df.columns) - set(variables_not_rescale)) variables_rescale.extend(variables_include) scaler = MinMaxScaler() df[variables_rescale] = scaler.fit_transform(df[variables_rescale]) return df def histogram_hour_counts( df, variable ): """ Create a day-long histogram of counts of the variable for each hour. It is assumed that the DataFrame index is datetime and that the variable `hour` exists. """ if not df.index.dtype in ["datetime64[ns]", "M8[ns]"]: log.error("index is not datetime") return False counts = df.groupby(df.index.hour)[variable].count() counts.plot(kind = "bar", width = 1, rot = 0, alpha = 0.7) def histogram_day_counts( df, variable ): """ Create a week-long histogram of counts of the variable for each day. It is assumed that the DataFrame index is datetime and that the variable `weekday_name` exists. """ if not df.index.dtype in ["datetime64[ns]", "M8[ns]"]: log.error("index is not datetime") return False counts = df.groupby(df.index.weekday_name)[variable].count().reindex(calendar.day_name[0:]) counts.plot(kind = "bar", width = 1, rot = 0, alpha = 0.7) def histogram_month_counts( df, variable ): """ Create a year-long histogram of counts of the variable for each month. It is assumed that the DataFrame index is datetime and that the variable `month_name` exists. """ if not df.index.dtype in ["datetime64[ns]", "M8[ns]"]: log.error("index is not datetime") return False counts = df.groupby(df.index.strftime("%B"))[variable].count().reindex(calendar.month_name[1:]) counts.plot(kind = "bar", width = 1, rot = 0, alpha = 0.7) def setup_Jupyter(): """ Set up a Jupyter notebook with a few defaults. """ sns.set(context = "paper", font = "monospace") warnings.filterwarnings("ignore") pd.set_option("display.max_rows", 500) pd.set_option("display.max_columns", 500) plt.rcParams["figure.figsize"] = (17, 10) def log_progress( sequence, every = None, size = None, name = "items" ): """ Display a progress bar widget in a Jupyter notebook. Its dependencies must be enabled on launching Jupyter, such as in the following way: jupyter nbextension enable --py widgetsnbextension The progress bar can be used in a way like the following: for item in shijian.log_progress([1, 2, 3, 4, 5]): time.sleep(5) """ from ipywidgets import IntProgress, HTML, VBox from IPython.display import display is_iterator = False if size is None: try: size = len(sequence) except TypeError: is_iterator = True if size is not None: if every is None: if size <= 200: every = 1 else: every = int(size / 200) # every 0.5 % else: assert every is not None, "sequence is iterator, set every" if is_iterator: progress = IntProgress(min = 0, max = 1, value = 1) progress.bar_style = "info" else: progress = IntProgress(min = 0, max = size, value = 0) label = HTML() box = VBox(children = [label, progress]) display(box) index = 0 try: for index, record in enumerate(sequence, 1): if index == 1 or index % every == 0: if is_iterator: label.value = "{name}: {index} / ?".format( name = name, index = index ) else: progress.value = index label.value = u"{name}: {index} / {size}".format( name = name, index = index, size = size ) yield record except: progress.bar_style = "danger" raise else: progress.bar_style = "success" progress.value = index label.value = "{name}: {index}".format( name = name, index = str(index or "?") ) _main()