Source code for bento.util

import pandas as pd
import numpy as np
import math
import as px

from collections import defaultdict
from bento.common import logger, logutil, dictutil  # noqa

logging = logger.fancy_logger(__name__)

[docs]def desnake(text): """Turns underscores into spaces""" return str(text).strip().replace("_", " ")
def titlize(text): return desnake(text.title()) def get_unit(num): scale = ["f", "n", "u", "m", "", "K", "M", "G", "T", "P"] offset = 4 try: exp = min(9, offset + math.log10(num) / 3) if exp < 0: return num, "" exp = int(exp) sig = round(num / 10 ** ((exp - 4) * 3), 2) return sig, scale[exp] except Exception: return num, "" # @logutil.loginfo(level="debug") def prepare_transforms(inputs, dep_var="y"): transforms = [] dep_column = dictutil.extract_unique(f"{dep_var}_column", inputs, pop=False) for key, value in dictutil.extract("_transform", inputs, pop=False).items(): if not value: continue if "window" in key: transforms.append((dep_column, ["rolling", "mean"], [(value,), ()])) elif "calc" in key: if value == "Rate": transforms.append((dep_column, ["diff"], [()])) elif value == "Acceleration": transforms.append((dep_column, ["diff", "diff"], [(), ()])) elif value == "Cumulative": transforms.append((dep_column, ["cumsum"], [()])) elif "norm" in key: if value == "Max": transforms.append((dep_column, ["div"], [("trace.max",)])) elif value == "Other Series": transforms.append((dep_column, ["div"], [("ref.0",)])) return transforms # @logutil.loginfo(level="debug") def prepare_filters(inputs): filters = defaultdict(lambda: defaultdict(list)) logic = dictutil.extract_unique("filter_logic", inputs, default="or").lower() for key, values in dictutil.extract("_filter", inputs, pop=False).items(): col = key.replace("_filter", "") if not values: continue if not isinstance(values, list): values = [values] if "date" in key and len(values) == 2: filters["between"][col].extend(values) else: filters[logic][col].extend(values) return filters # NOTE Currently used for pie charts and ranking # @logutil.loginfo(level='debug') def filter_df(idf, filters): odf = idf for logic, columns in filters.items(): for column, values in columns.items(): if "datetime" in str(type(values[0])): values = [np.datetime64(item) for item in values] if logic == "between": # TODO generalize filters to handle types try: odf = odf[(odf[column] >= values[0]) & (odf[column] <= values[1])] except TypeError: odf = odf[ (odf[column].astype(int) >= values[0]) & (odf[column].astype(int) <= values[1]) # noqa ] elif logic == "or": odf = odf[odf[column].isin(values)] elif logic == "and": odf = odf[odf[column].isin(values)] return odf # @logutil.loginfo(level="debug") # TODO Figure out a way around this hack, which manually filters out None strings as # a substitute for properly dealing with bipartite dataframes def rank(idf, key, text_key, column, count=10, **kwargs): fdf = idf.groupby(key).sum().reset_index() fdf = fdf[fdf[text_key] != "None"] fdf = fdf.nlargest(count, column) return zip(fdf[text_key], fdf[column]) def apply_defaults(component_type, raw_inputs, data): inputs = {"variant": component_type.split(".")[-1]} inputs.update(raw_inputs) if not component_type.startswith("graph.normal"): logging.debug("No defaults implemented for non-normal graph components") return inputs if "x_column" not in inputs: possible = [col for col in data["df"].columns if "date" in col] inputs["x_column"] = possible[0] inputs["x_scale"] = "date""Graph x_column missing, setting to '{possible[0]}'")" ...also setting x_scale to 'date'") if "y_column" not in inputs: possible = [ key for key, val in data["types"].items() if "date" not in key and "date" != val ] inputs["y_column"] = possible[0]"Graph y_column missing, setting to '{possible[0]}'") return inputs # NOTE Used for preparing the traces for graphs # TODO Should combine this with filter_df/ # @logutil.loginfo(level="debug") def prepare_traces(idf, filters, key_columns): # NOTE Brought over from figure callback, default multi-column approach # TODO Figure out how to determine default columns from df # column ="keys",["columns"][0])[0] # def_x_column =["columns"][1] # idf.groupby(column).max().reset_index().nlargest(8, def_x_column)[column] idf["label"] = "" = "" traces = [idf] for logic, columns in filters.items(): new_traces = [] if logic == "between": for column, values in columns.items(): for df in traces: # TODO generalize filters to handle types try: new = df[(df[column] >= values[0]) & (df[column] <= values[1])] except TypeError: new = df[ (df[column].astype(int) >= values[0]) & (df[column].astype(int) <= values[1]) # noqa ] = new_traces.append(new) elif logic == "or": for column, values in columns.items(): for df in traces: for value in values: new = df[df[column] == value] = try: += " " + value except Exception: # try: += " " + pd.to_datetime(value).strftime("%Y-%m-%d") # except Exception: # logging.warning(f"Can't add {column} to trace name") new_traces.append(new) elif logic == "and": for column, values in columns.items(): for df in traces: for value in values: new = df[df[column] == value] = try: += " " + value except Exception: # try: += " " + pd.to_datetime(value).strftime("%Y-%m-%d") # except Exception: # logging.warning(f"Can't add {column} to trace name") new_traces.append(new) traces = new_traces new_traces = [] if key_columns: for df in traces: new = df.groupby(key_columns).sum().reset_index() = new_traces.append(new) else: new_traces = traces traces = new_traces return traces # @logutil.loginfo(level="debug") def trace_analytics(traces, transforms): for transform in transforms: column, operations, arg_list = transform for trace in traces: buff = trace[column] for op, args in zip(operations, arg_list): final_args = [] for arg in args: if "trace" in str(arg): oper = arg.split(".")[-1] final_args.append(getattr(buff, oper)()) elif "ref" in str(arg): idx = 0 final_args.append(traces[idx][column]) else: final_args.append(arg) buff = getattr(buff, op)(*final_args) trace[column] = buff return traces # @logutil.loginfo(level="debug") def aggregate(idf, y_column=None, filters=None, logic="sum", keys=None, **kwargs): filters = filters or {} filters.update(kwargs.get("fixed_filters", {})) # TODO Plenty of work to do cleaning up the data processing utilities like this keys = keys or ["date"] traces = prepare_traces(idf, filters, keys) agg_df = pd.concat(traces) # NOTE Pay attention to this block for multi-axis support if not y_column: return len(agg_df), "" elif isinstance(y_column, list): y_column = y_column[0] quantity = getattr(agg_df[y_column], logic)() return get_unit(quantity) def _date_marks(ordered): spacing = 7 style = {} labels = {item: pd.Timestamp(item).date().day for item in ordered[::spacing]} marks = { int(key): {"label": label, "style": style} for key, label in labels.items() } return marks def gen_marks(series, variant="auto"): """Processes a dataframe column into a valid slider series""" ordered = sorted(series) spacing = math.ceil(len(ordered) / 10) if variant == "date": marks = _date_marks(ordered) else: marks = {item: str(item) for item in series[::spacing]} return marks def gen_options(option_input, multi=False, default=None): # In this case, we're given just the set of options only, assuming first is default if isinstance(option_input, list): option_list = option_input base_default = [] if multi else option_list[0] default = default if default is not None else base_default # The default may be specified in the dict version elif isinstance(option_input, dict): # TODO Make this more robust if "value" in option_input: return option_input option_list = option_input["options"] base_default = [] if multi else option_list[0] default = ( default if default is not None else option_input.get("default", base_default) ) else: logging.warning(f"Unsupported type {type(option_input)} for options") logging.debug(option_input) # TODO Can we determine when we should run desnake on the entries? options = [{"label": desnake(item).title(), "value": item} for item in option_list] return {"options": options, "value": default} def get_first_numeric(data_types): for dtype in data_types: if "log" in data_types[dtype]: return dtype def log_color_scale(name, base=2.718, category="sequential"): color_category = getattr(px.colors, category) color_sequence = getattr(color_category, name) # print(color_sequence) log_val = [round(1 / base ** idx, 10) for idx in range(len(color_sequence))][::-1] log_val[0] = 0 log_sequence = list(zip(log_val, color_sequence)) return log_sequence # @logutil.loginfo(level='debug') def data_range(pd_df_list, column, scale="category"): minimum = pd_df_list[0][column].min() maximum = pd_df_list[0][column].max() for df in pd_df_list: series = df[column] minimum = min(minimum, series.min()) maximum = max(maximum, series.max()) if scale == "log": minimum = max(0.1, minimum) minimum = np.floor(max(-1, np.log10(minimum))) maximum = np.ceil(np.log10(maximum)) elif scale == "linear": minimum = np.floor(minimum) maximum = np.ceil(maximum) return [minimum, maximum] if __name__ == "__main__": log_color_scale("Viridis", base=10) for i in range(-18, 19): unit = "Hz" sig, scale = get_unit(10 ** i) print(f"{sig} {scale}{unit}")