Source code for falkordb.execution_plan

import re
from typing import List, Optional


[docs] class ProfileStats: """ ProfileStats Class for representing runtime execution statistics of an operation. Attributes: records_produced (int): The number of records produced. execution_time (float): The execution time in milliseconds. """ def __init__(self, records_produced: int, execution_time: float): """ Initializes a new ProfileStats instance with the given records_produced and execution_time. Args: records_produced (int): The number of records produced. execution_time (float): The execution time in milliseconds. """ self.execution_time = execution_time self.records_produced = records_produced
[docs] class Operation: """ Operation Class for representing a single operation within an execution plan. Attributes: name (str): The name of the operation. args (str): Operation arguments. children (list): List of child operations. profile_stats (ProfileStats): Profile statistics for the operation. """ def __init__( self, name: str, args: Optional[str] = None, profile_stats: Optional[ProfileStats] = None, ): """ Creates a new Operation instance. Args: name (str): The name of the operation. args (str, optional): Operation arguments. profile_stats (ProfileStats, optional): Profile statistics for the operation. """ self.name = name self.args = args self.children: List[Operation] = [] self.profile_stats = profile_stats @property def execution_time(self) -> float: """ returns operation's execution time in ms """ assert self.profile_stats is not None return self.profile_stats.execution_time @property def records_produced(self) -> int: """ returns number of records produced by operation. """ assert self.profile_stats is not None return self.profile_stats.records_produced
[docs] def append_child(self, child): """ Appends a child operation to the current operation. Args: child (Operation): The child operation to append. Returns: Operation: The updated operation instance. """ if not isinstance(child, Operation): raise Exception("child must be Operation") self.children.append(child) return self
[docs] def child_count(self) -> int: """ Returns the number of child operations. Returns: int: Number of child operations. """ return len(self.children)
def __eq__(self, o: object) -> bool: """ Compares two Operation instances for equality based on their name and arguments. Args: o (object): Another Operation instance for comparison. Returns: bool: True if the operations are equal, False otherwise. """ if not isinstance(o, Operation): return False return self.name == o.name and self.args == o.args def __str__(self) -> str: """ Returns a string representation of the operation. Returns: str: String representation of the operation. """ args_str = "" if self.args is None else " | " + self.args return f"{self.name}{args_str}"
[docs] class ExecutionPlan: """ ExecutionPlan Class for representing a collection of operations. Attributes: plan (list): List of strings representing the collection of operations. structured_plan (Operation): Root of the structured operation tree. """ def __init__(self, plan): """ Creates a new ExecutionPlan instance. Args: plan (list): List of strings representing the collection of operations. """ if not isinstance(plan, list): raise Exception("plan must be an array") if isinstance(plan[0], bytes): plan = [b.decode() for b in plan] self.plan = plan self.operations = {} self.structured_plan = self._operation_tree() for key in self.operations: self.operations[key].reverse()
[docs] def collect_operations(self, op_name): """ Collects all operations with specified name from plan Args: op_name (string): Name of operation to collect Returns: List[Operation]: All operations with the specified name """ if op_name in self.operations: return self.operations[op_name] return [] ops = [] for op in self.operations: if op.name == op_name: ops.append(op) return ops
def __compare_operations(self, root_a, root_b) -> bool: """ Compares execution plan operation trees. Returns: bool: True if operation trees are equal, False otherwise. """ # compare current root if root_a != root_b: return False # make sure root have the same number of children if root_a.child_count() != root_b.child_count(): return False # recursively compare children for i in range(root_a.child_count()): if not self.__compare_operations(root_a.children[i], root_b.children[i]): return False return True def __str__(self) -> str: """ Returns a string representation of the execution plan. Returns: str: String representation of the execution plan. """ def aggregate_str(str_children): return "\n".join( [ " " + line for str_child in str_children for line in str_child.splitlines() ] ) def combine_str(x, y): return f"{x}\n{y}" return self._operation_traverse( self.structured_plan, str, aggregate_str, combine_str ) def __eq__(self, o: object) -> bool: """ Compares two execution plans. Returns: bool: True if the two plans are equal, False otherwise. """ # make sure 'o' is an execution-plan if not isinstance(o, ExecutionPlan): return False # get root for both plans root_a = self.structured_plan root_b = o.structured_plan # compare execution trees return self.__compare_operations(root_a, root_b) def __iter__(self): return iter(self.operations) def _operation_traverse(self, op, op_f, aggregate_f, combine_f): """ Traverses the operation tree recursively applying functions. Args: op: Operation to traverse. op_f: Function applied for each operation. aggregate_f: Aggregation function applied for all children of a single operation. combine_f: Combine function applied for the operation result and the children result. """ # apply op_f for each operation op_res = op_f(op) if len(op.children) == 0: return op_res # no children return # apply _operation_traverse recursively children = [ self._operation_traverse(child, op_f, aggregate_f, combine_f) for child in op.children ] # combine the operation result with the children aggregated result return combine_f(op_res, aggregate_f(children)) def _operation_tree(self): """ Builds the operation tree from the string representation. Returns: Operation: Root of the structured operation tree. """ # initial state i = 0 level = 0 stack = [] current = None def create_operation(args): profile_stats = None name = args[0].strip() args.pop(0) if len(args) > 0 and "Records produced" in args[-1]: records_produced = int( re.search("Records produced: (\\d+)", args[-1]).group(1) ) execution_time = float( re.search("Execution time: (\\d+.\\d+) ms", args[-1]).group(1) ) profile_stats = ProfileStats(records_produced, execution_time) args.pop(-1) return Operation( name, None if len(args) == 0 else args[0].strip(), profile_stats ) # iterate plan operations while i < len(self.plan): current_op = self.plan[i] op_level = current_op.count(" ") if op_level == level: # if the operation level equal to the current level # set the current operation and move next child = create_operation(current_op.split("|")) if child.name not in self.operations: self.operations[child.name] = [] self.operations[child.name].append(child) if current: current = stack.pop() current.append_child(child) current = child i += 1 stack.append(child) elif op_level == level + 1: # if the operation is child of the current operation # add it as child and set as current operation child = create_operation(current_op.split("|")) if child.name not in self.operations: self.operations[child.name] = [] self.operations[child.name].append(child) current.append_child(child) stack.append(current) current = child level += 1 i += 1 elif op_level < level: # if the operation is not child of current operation # go back to it's parent operation levels_back = level - op_level + 1 for _ in range(levels_back): current = stack.pop() level -= levels_back else: raise Exception("corrupted plan") return stack[0]