Source code for graphix.gflow

"""Flow finding algorithm.

For a given underlying graph (G, I, O, meas_plane), this method finds a (generalized) flow [NJP 9, 250 (2007)]
in polynomincal time.
In particular, this outputs gflow with minimum depth, maximally delayed gflow.

Ref: Mhalla and Perdrix, International Colloquium on Automata,
Languages, and Programming (Springer, 2008), pp. 857-868.
Ref: Backens et al., Quantum 5, 421 (2021).

"""

from __future__ import annotations

from copy import deepcopy
from itertools import product
from typing import TYPE_CHECKING

import networkx as nx
import numpy as np
import sympy as sp
from typing_extensions import assert_never

from graphix.command import CommandKind
from graphix.fundamentals import Axis, Plane
from graphix.linalg import MatGF2
from graphix.measurements import PauliMeasurement

if TYPE_CHECKING:
    from collections.abc import Mapping
    from collections.abc import Set as AbstractSet

    from graphix.parameter import ExpressionOrFloat
    from graphix.pattern import Pattern


# TODO: This should be ensured by type-checking.
def check_meas_planes(meas_planes: dict[int, Plane]) -> None:
    """Check that all planes are valid planes."""
    for node, plane in meas_planes.items():
        if not isinstance(plane, Plane):
            raise TypeError(f"Measure plane for {node} is `{plane}`, which is not an instance of `Plane`")


[docs] def find_gflow( graph: nx.Graph[int], iset: set[int], oset: set[int], meas_planes: dict[int, Plane], mode: str = "single", ) -> tuple[dict[int, set[int]], dict[int, int]]: """Maximally delayed gflow finding algorithm. For open graph g with input, output, and measurement planes, this returns maximally delayed gflow. gflow consist of function g(i) where i is the qubit labels, and strict partial ordering < or layers labels l_k where each element specify the order of qubits to be measured to maintain determinism in MBQC. In practice, we must measure qubits in order specified in array l_k (increasing order of l_k from 1), and for each measurements of qubit i we must perform corrections on qubits in g(i), depending on the measurement outcome. For more details of gflow, see Browne et al., NJP 9, 250 (2007). We use the extended gflow finding algorithm in Backens et al., Quantum 5, 421 (2021). Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output meas_planes: dict measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. mode: str The gflow finding algorithm can yield multiple equivalent solutions. So there are three options - "single": Returrns a single solution - "all": Returns all possible solutions - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, requiring user substitution to get a concrete answer. Optional. Default is "single". Returns ------- g: dict gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. """ check_meas_planes(meas_planes) l_k = {} g = {} for node in graph.nodes: l_k[node] = 0 return gflowaux(graph, iset, oset, meas_planes, 1, l_k, g, mode=mode)
def gflowaux( graph: nx.Graph, iset: set[int], oset: set[int], meas_planes: dict[int, Plane], k: int, l_k: dict[int, int], g: dict[int, set[int]], mode: str = "single", ): """Find one layer of the gflow. Ref: Backens et al., Quantum 5, 421 (2021). Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output meas_planes: dict measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. k: int current layer number. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. g: dict gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. mode: str(optional) The gflow finding algorithm can yield multiple equivalent solutions. So there are three options - "single": Returrns a single solution - "all": Returns all possible solutions - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, requiring user substitution to get a concrete answer. Returns ------- g: dict gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. """ nodes = set(graph.nodes) if oset == nodes: return g, l_k non_output = nodes - oset correction_candidate = oset - iset adj_mat, node_order_list = get_adjacency_matrix(graph) node_order_row = node_order_list.copy() node_order_row.sort() node_order_col = node_order_list.copy() node_order_col.sort() for out in oset: adj_mat.remove_row(node_order_row.index(out)) node_order_row.remove(out) adj_mat_row_reduced = adj_mat.copy() # later use for construct RHS for node in nodes - correction_candidate: adj_mat.remove_col(node_order_col.index(node)) node_order_col.remove(node) b = MatGF2(np.zeros((adj_mat.data.shape[0], len(non_output)), dtype=int)) for i_row in range(len(node_order_row)): node = node_order_row[i_row] vec = MatGF2(np.zeros(len(node_order_row), dtype=int)) if meas_planes[node] == Plane.XY: vec.data[i_row] = 1 elif meas_planes[node] == Plane.XZ: vec.data[i_row] = 1 vec_add = adj_mat_row_reduced.data[:, node_order_list.index(node)] vec += vec_add elif meas_planes[node] == Plane.YZ: vec.data = adj_mat_row_reduced.data[:, node_order_list.index(node)].reshape(vec.data.shape) b.data[:, i_row] = vec.data adj_mat, b, _, col_permutation = adj_mat.forward_eliminate(b) x, kernels = adj_mat.backward_substitute(b) corrected_nodes = set() for i_row in range(len(node_order_row)): non_out_node = node_order_row[i_row] x_col = x[:, i_row] if 0 in x_col.shape or x_col[0] == sp.nan: # no solution continue if mode == "single": sol_list = [x_col[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_col))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] g[non_out_node] = {node_order_col[col_permutation.index(i)] for i in sol_index} if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: g[non_out_node] |= {non_out_node} elif mode == "all": g[non_out_node] = set() binary_combinations = product([0, 1], repeat=len(kernels)) for binary_combination in binary_combinations: sol_list = [x_col[i].subs(zip(kernels, binary_combination)) for i in range(len(x_col))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] g_i = {node_order_col[col_permutation.index(i)] for i in sol_index} if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: g_i |= {non_out_node} g[non_out_node] |= {frozenset(g_i)} elif mode == "abstract": g[non_out_node] = {} for i in range(len(x_col)): node = node_order_col[col_permutation.index(i)] g[non_out_node][node] = x_col[i] if meas_planes[non_out_node] in {Plane.XZ, Plane.YZ}: g[non_out_node][non_out_node] = sp.true l_k[non_out_node] = k corrected_nodes |= {non_out_node} if len(corrected_nodes) == 0: if oset == nodes: return g, l_k return None, None return gflowaux( graph, iset, oset | corrected_nodes, meas_planes, k + 1, l_k, g, mode=mode, )
[docs] def find_flow( graph: nx.Graph[int], iset: set[int], oset: set[int], meas_planes: dict[int, Plane] | None = None, ) -> tuple[dict[int, set[int]], dict[int, int]]: """Causal flow finding algorithm. For open graph g with input, output, and measurement planes, this returns causal flow. For more detail of causal flow, see Danos and Kashefi, PRA 74, 052310 (2006). Original algorithm by Mhalla and Perdrix, International Colloquium on Automata, Languages, and Programming (2008), pp. 857-868. Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output meas_planes: dict(int, Plane) measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. Note that an underlying graph has a causal flow only if all measurement planes are Plane.XY. If not specified, all measurement planes are interpreted as Plane.XY. Returns ------- f: list of nodes causal flow function. f[i] is the qubit to be measured after qubit i. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. """ check_meas_planes(meas_planes) nodes = set(graph.nodes) edges = set(graph.edges) if meas_planes is None: meas_planes = dict.fromkeys(nodes - oset, Plane.XY) for plane in meas_planes.values(): if plane != Plane.XY: return None, None l_k = dict.fromkeys(nodes, 0) f = {} k = 1 v_c = oset - iset return flowaux(nodes, edges, iset, oset, v_c, f, l_k, k)
def flowaux( nodes: set[int], edges: set[tuple[int, int]], iset: set[int], oset: set[int], v_c: set[int], f: dict[int, set[int]], l_k: dict[int, int], k: int, ): """Find one layer of the flow. Ref: Mhalla and Perdrix, International Colloquium on Automata, Languages, and Programming (Springer, 2008), pp. 857-868. Parameters ---------- nodes: set labels of all qubits (nodes) edges: set edges iset: set set of node labels for input oset: set set of node labels for output v_c: set correction candidate qubits f: dict flow function. f[i] is the qubit to be measured after qubit i. l_k: dict layers obtained by flow algorithm. l_k[d] is a node set of depth d. k: int current layer number. meas_planes: dict measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. Outputs ------- f: list of nodes causal flow function. f[i] is the qubit to be measured after qubit i. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. """ v_out_prime = set() c_prime = set() for q in v_c: nb = search_neighbor(q, edges) p_set = nb & (nodes - oset) if len(p_set) == 1: # Iterate over p_set assuming there is only one element p (p,) = p_set f[p] = {q} l_k[p] = k v_out_prime |= {p} c_prime |= {q} # determine whether there exists flow if not v_out_prime: if oset == nodes: return f, l_k return None, None return flowaux( nodes, edges, iset, oset | v_out_prime, (v_c - c_prime) | (v_out_prime & (nodes - iset)), f, l_k, k + 1, )
[docs] def find_pauliflow( graph: nx.Graph[int], iset: set[int], oset: set[int], meas_planes: dict[int, Plane], meas_angles: Mapping[int, ExpressionOrFloat], mode: str = "single", ) -> tuple[dict[int, set[int]], dict[int, int]]: """Maximally delayed Pauli flow finding algorithm. For open graph g with input, output, measurement planes and measurement angles, this returns maximally delayed Pauli flow. Pauli flow consist of function p(i) where i is the qubit labels, and strict partial ordering < or layers labels l_k where each element specify the order of qubits to be measured to maintain determinism in MBQC. In practice, we must measure qubits in order specified in array l_k (increasing order of l_k from 1), and for each measurements of qubit i we must perform corrections on qubits in p(i), depending on the measurement outcome. For more details of Pauli flow and the finding algorithm used in this method, see Simmons et al., EPTCS 343, 2021, pp. 50-101 (arXiv:2109.05654). Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output meas_planes: dict measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. meas_angles: dict measurement angles for each qubits. meas_angles[i] is the measurement angle for qubit i. mode: str The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are three options - "single": Returrns a single solution - "all": Returns all possible solutions - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, requiring user substitution to get a concrete answer. Optional. Default is "single". Returns ------- p: dict Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. """ check_meas_planes(meas_planes) l_k = {} p = {} l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) for node in graph.nodes: if node in oset: l_k[node] = 0 return pauliflowaux(graph, iset, oset, meas_planes, 0, set(), oset, l_k, p, (l_x, l_y, l_z), mode)
def pauliflowaux( graph: nx.Graph, iset: set[int], oset: set[int], meas_planes: dict[int, Plane], k: int, correction_candidate: set[int], solved_nodes: set[int], l_k: dict[int, int], p: dict[int, set[int]], ls: tuple[set[int], set[int], set[int]], mode: str = "single", ): """Find one layer of the Pauli flow. Ref: Simmons et al., EPTCS 343, 2021, pp. 50-101 (arXiv:2109.05654). Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output meas_planes: dict measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. k: int current layer number. correction_candidate: set set of qubits to be corrected. solved_nodes: set set of qubits whose layers are already determined. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. p: dict Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. ls: tuple ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. mode: str(optional) The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are three options - "single": Returrns a single solution - "all": Returns all possible solutions - "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects, requiring user substitution to get a concrete answer. Returns ------- p: dict Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. """ l_x, l_y, l_z = ls solved_update = set() nodes = set(graph.nodes) if oset == nodes: return p, l_k unsolved_nodes = nodes - solved_nodes adj_mat, node_order_list = get_adjacency_matrix(graph) adj_mat_w_id = adj_mat.copy() + MatGF2(np.identity(adj_mat.data.shape[0], dtype=int)) node_order_row = node_order_list.copy() node_order_row_lower = node_order_list.copy() node_order_col = node_order_list.copy() p_bar = correction_candidate | l_y | l_z pset = nodes - p_bar kset = (correction_candidate | l_x | l_y) & (nodes - iset) yset = l_y - correction_candidate for node in unsolved_nodes: adj_mat_ = adj_mat.copy() adj_mat_w_id_ = adj_mat_w_id.copy() node_order_row_ = node_order_row.copy() node_order_row_lower_ = node_order_row_lower.copy() node_order_col_ = node_order_col.copy() for node_ in nodes - (pset | {node}): adj_mat_.remove_row(node_order_row_.index(node_)) node_order_row_.remove(node_) for node_ in nodes - (yset - {node}): adj_mat_w_id_.remove_row(node_order_row_lower_.index(node_)) node_order_row_lower_.remove(node_) for node_ in nodes - (kset - {node}): adj_mat_.remove_col(node_order_col_.index(node_)) adj_mat_w_id_.remove_col(node_order_col_.index(node_)) node_order_col_.remove(node_) adj_mat_.concatenate(adj_mat_w_id_, axis=0) if mode == "all": p[node] = set() if mode == "abstract": p[node] = [] solved = False if meas_planes[node] == Plane.XY or node in l_x or node in l_y: mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) mat.data[node_order_row_.index(node), :] = 1 mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) mat.concatenate(mat_lower, axis=0) adj_mat_xy, mat, _, col_permutation_xy = adj_mat_.forward_eliminate(mat, copy=True) x_xy, kernels = adj_mat_xy.backward_substitute(mat) if 0 not in x_xy.shape and x_xy[0, 0] != sp.nan: solved_update |= {node} x_xy = x_xy[:, 0] l_k[node] = k if mode == "single": sol_list = [x_xy[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_xy))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p[node] = {node_order_col_[col_permutation_xy.index(i)] for i in sol_index} solved = True elif mode == "all": binary_combinations = product([0, 1], repeat=len(kernels)) for binary_combination in binary_combinations: sol_list = [x_xy[i].subs(zip(kernels, binary_combination)) for i in range(len(x_xy))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p_i = {node_order_col_[col_permutation_xy.index(i)] for i in sol_index} p[node].add(frozenset(p_i)) elif mode == "abstract": p_i = {} for i in range(len(x_xy)): node_temp = node_order_col_[col_permutation_xy.index(i)] p_i[node_temp] = x_xy[i] p[node].append(p_i) if not solved and (meas_planes[node] == Plane.XZ or node in l_z or node in l_x): mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) mat.data[node_order_row_.index(node)] = 1 for neighbor in search_neighbor(node, graph.edges): if neighbor in pset | {node}: mat.data[node_order_row_.index(neighbor), :] = 1 mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) for neighbor in search_neighbor(node, graph.edges): if neighbor in yset - {node}: mat_lower.data[node_order_row_lower_.index(neighbor), :] = 1 mat.concatenate(mat_lower, axis=0) adj_mat_xz, mat, _, col_permutation_xz = adj_mat_.forward_eliminate(mat, copy=True) x_xz, kernels = adj_mat_xz.backward_substitute(mat) if 0 not in x_xz.shape and x_xz[0, 0] != sp.nan: solved_update |= {node} x_xz = x_xz[:, 0] l_k[node] = k if mode == "single": sol_list = [x_xz[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_xz))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p[node] = {node_order_col_[col_permutation_xz.index(i)] for i in sol_index} | {node} solved = True elif mode == "all": binary_combinations = product([0, 1], repeat=len(kernels)) for binary_combination in binary_combinations: sol_list = [x_xz[i].subs(zip(kernels, binary_combination)) for i in range(len(x_xz))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p_i = {node_order_col_[col_permutation_xz.index(i)] for i in sol_index} | {node} p[node].add(frozenset(p_i)) elif mode == "abstract": p_i = {} for i in range(len(x_xz)): node_temp = node_order_col_[col_permutation_xz.index(i)] p_i[node_temp] = x_xz[i] p_i[node] = sp.true p[node].append(p_i) if not solved and (meas_planes[node] == Plane.YZ or node in l_y or node in l_z): mat = MatGF2(np.zeros((len(node_order_row_), 1), dtype=int)) for neighbor in search_neighbor(node, graph.edges): if neighbor in pset | {node}: mat.data[node_order_row_.index(neighbor), :] = 1 mat_lower = MatGF2(np.zeros((len(node_order_row_lower_), 1), dtype=int)) for neighbor in search_neighbor(node, graph.edges): if neighbor in yset - {node}: mat_lower.data[node_order_row_lower_.index(neighbor), :] = 1 mat.concatenate(mat_lower, axis=0) adj_mat_yz, mat, _, col_permutation_yz = adj_mat_.forward_eliminate(mat, copy=True) x_yz, kernels = adj_mat_yz.backward_substitute(mat) if 0 not in x_yz.shape and x_yz[0, 0] != sp.nan: solved_update |= {node} x_yz = x_yz[:, 0] l_k[node] = k if mode == "single": sol_list = [x_yz[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_yz))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p[node] = {node_order_col_[col_permutation_yz.index(i)] for i in sol_index} | {node} solved = True elif mode == "all": binary_combinations = product([0, 1], repeat=len(kernels)) for binary_combination in binary_combinations: sol_list = [x_yz[i].subs(zip(kernels, binary_combination)) for i in range(len(x_yz))] sol = np.array(sol_list) sol_index = sol.nonzero()[0] p_i = {node_order_col_[col_permutation_yz.index(i)] for i in sol_index} | {node} p[node].add(frozenset(p_i)) elif mode == "abstract": p_i = {} for i in range(len(x_yz)): node_temp = node_order_col_[col_permutation_yz.index(i)] p_i[node_temp] = x_yz[i] p_i[node] = sp.true p[node].append(p_i) if solved_update == set() and k > 0: if solved_nodes == nodes: return p, l_k return None, None bset = solved_nodes | solved_update return pauliflowaux(graph, iset, oset, meas_planes, k + 1, bset, bset, l_k, p, (l_x, l_y, l_z), mode)
[docs] def flow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, int]]: """Check if the pattern has a valid flow. If so, return the flow and layers. Parameters ---------- pattern: Pattern pattern to be based on Returns ------- f: dict flow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by flow algorithm. l_k[d] is a node set of depth d. """ if not pattern.is_standard(strict=True): raise ValueError("The pattern should be standardized first.") meas_planes = pattern.get_meas_plane() for plane in meas_planes.values(): if plane != Plane.XY: return None, None g = nx.Graph() nodes, edges = pattern.get_graph() g.add_nodes_from(nodes) g.add_edges_from(edges) input_nodes = pattern.input_nodes if not pattern.input_nodes else set() output_nodes = set(pattern.output_nodes) nodes = set(nodes) layers = pattern.get_layers() l_k = {} for l in layers[1]: for n in layers[1][l]: l_k[n] = l lmax = max(l_k.values()) if l_k else 0 for node, val in l_k.items(): l_k[node] = lmax - val + 1 for output_node in pattern.output_nodes: l_k[output_node] = 0 xflow, zflow = get_corrections_from_pattern(pattern) if verify_flow(g, input_nodes, output_nodes, xflow): # if xflow is valid zflow_from_xflow = {} for node, corrections in deepcopy(xflow).items(): cand = find_odd_neighbor(g, corrections) - {node} if cand: zflow_from_xflow[node] = cand if zflow_from_xflow != zflow: # if zflow is consistent with xflow return None, None return xflow, l_k return None, None
[docs] def gflow_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, int]]: """Check if the pattern has a valid gflow. If so, return the gflow and layers. Parameters ---------- pattern: Pattern pattern to be based on Returns ------- g: dict gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by gflow algorithm. l_k[d] is a node set of depth d. """ if not pattern.is_standard(strict=True): raise ValueError("The pattern should be standardized first.") g = nx.Graph() nodes, edges = pattern.get_graph() g.add_nodes_from(nodes) g.add_edges_from(edges) input_nodes = set(pattern.input_nodes) if pattern.input_nodes else set() output_nodes = set(pattern.output_nodes) meas_planes = pattern.get_meas_plane() nodes = set(nodes) layers = pattern.get_layers() l_k = {} for l in layers[1]: for n in layers[1][l]: l_k[n] = l lmax = max(l_k.values()) if l_k else 0 for node, val in l_k.items(): l_k[node] = lmax - val + 1 for output_node in pattern.output_nodes: l_k[output_node] = 0 xflow, zflow = get_corrections_from_pattern(pattern) for node, plane in meas_planes.items(): if plane in {Plane.XZ, Plane.YZ}: if node not in xflow: xflow[node] = {node} xflow[node] |= {node} if verify_gflow(g, input_nodes, output_nodes, xflow, meas_planes): # if xflow is valid zflow_from_xflow = {} for node, corrections in deepcopy(xflow).items(): cand = find_odd_neighbor(g, corrections) - {node} if cand: zflow_from_xflow[node] = cand if zflow_from_xflow != zflow: # if zflow is consistent with xflow return None, None return xflow, l_k return None, None
[docs] def pauliflow_from_pattern(pattern: Pattern, mode="single") -> tuple[dict[int, set[int]], dict[int, int]]: """Check if the pattern has a valid Pauliflow. If so, return the Pauliflow and layers. Parameters ---------- pattern: Pattern pattern to be based on mode: str The Pauliflow finding algorithm can yield multiple equivalent solutions. So there are two options - "single": Returns a single solution - "all": Returns all possible solutions Optional. Default is "single". Returns ------- p: dict Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. l_k: dict layers obtained by Pauli flow algorithm. l_k[d] is a node set of depth d. """ if not pattern.is_standard(strict=True): raise ValueError("The pattern should be standardized first.") g = nx.Graph() nodes, edges = pattern.get_graph() nodes = set(nodes) g.add_nodes_from(nodes) g.add_edges_from(edges) input_nodes = set(pattern.input_nodes) if pattern.input_nodes else set() output_nodes = set(pattern.output_nodes) non_outputs = nodes - output_nodes meas_planes = pattern.get_meas_plane() meas_angles = pattern.get_angles() nodes = set(nodes) l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) p_all, l_k = find_pauliflow(g, input_nodes, output_nodes, meas_planes, meas_angles, mode="all") if p_all is None: return None, None p = {} xflow, zflow = get_corrections_from_pattern(pattern) for node in non_outputs: xflow_node = xflow.get(node, set()) zflow_node = zflow.get(node, set()) p_list = list(p_all[node]) if node in p_all else [] valid = False for p_i in p_list: if xflow_node & p_i == xflow_node: ignored_nodes = p_i - xflow_node - {node} # check if nodes in ignored_nodes are measured in X or Y basis if ignored_nodes & (l_x | l_y) != ignored_nodes: continue odd_neighbers = find_odd_neighbor(g, p_i) if zflow_node & odd_neighbers == zflow_node: ignored_nodes = zflow_node - odd_neighbers - {node} # check if nodes in ignored_nodes are measured in Z or Y basis if ignored_nodes & (l_y | l_z) == ignored_nodes: valid = True if mode == "single": p[node] = set(p_i) break if mode == "all": if node not in p: p[node] = set() p[node].add(frozenset(p_i)) continue if not valid: return None, None return p, l_k
def get_corrections_from_pattern(pattern: Pattern) -> tuple[dict[int, set[int]], dict[int, set[int]]]: """Get x and z corrections from pattern. Parameters ---------- pattern: graphix.Pattern object pattern to be based on Returns ------- xflow: dict xflow function. xflow[i] is the set of qubits to be corrected in the X basis for the measurement of qubit i. zflow: dict zflow function. zflow[i] is the set of qubits to be corrected in the Z basis for the measurement of qubit i. """ nodes, _ = pattern.get_graph() nodes = set(nodes) xflow = {} zflow = {} for cmd in pattern: if cmd.kind == CommandKind.M: target = cmd.node xflow_source = cmd.s_domain & nodes zflow_source = cmd.t_domain & nodes for node in xflow_source: if node not in xflow: xflow[node] = set() xflow[node] |= {target} for node in zflow_source: if node not in zflow: zflow[node] = set() zflow[node] |= {target} if cmd.kind == CommandKind.X: target = cmd.node xflow_source = cmd.domain & nodes for node in xflow_source: if node not in xflow: xflow[node] = set() xflow[node] |= {target} if cmd.kind == CommandKind.Z: target = cmd.node zflow_source = cmd.domain & nodes for node in zflow_source: if node not in zflow: zflow[node] = set() zflow[node] |= {target} return xflow, zflow def search_neighbor(node: int, edges: set[tuple[int, int]]) -> set[int]: """Find neighborhood of node in edges. This is an ancillary method for `flowaux()`. Parameter ------- node: int target node number whose neighboring nodes will be collected edges: set of taples set of edges in the graph Outputs ------ N: list of ints neighboring nodes """ nb = set() for edge in edges: if node == edge[0]: nb |= {edge[1]} elif node == edge[1]: nb |= {edge[0]} return nb def get_min_depth(l_k: Mapping[int, int]) -> int: """Get minimum depth of graph. Parameters ---------- l_k: dict layers obtained by flow or gflow Returns ------- d: int minimum depth of graph """ return max(l_k.values()) def find_odd_neighbor(graph: nx.Graph[int], vertices: AbstractSet[int]) -> set[int]: """Return the set containing the odd neighbor of a set of vertices. Parameters ---------- graph : :class:`networkx.Graph` Underlying graph vertices : set set of nodes indices to find odd neighbors Returns ------- odd_neighbors : set set of indices for odd neighbor of set `vertices`. """ odd_neighbors = set() for vertex in vertices: neighbors = set(graph.neighbors(vertex)) odd_neighbors ^= neighbors return odd_neighbors def get_layers(l_k: Mapping[int, int]) -> tuple[int, dict[int, set[int]]]: """Get components of each layer. Parameters ---------- l_k: dict layers obtained by flow or gflow algorithms Returns ------- d: int minimum depth of graph layers: dict of set components of each layer """ d = get_min_depth(l_k) layers: dict[int, set[int]] = {k: set() for k in range(d + 1)} for i, val in l_k.items(): layers[val] |= {i} return d, layers def get_dependence_flow( inputs: set[int], flow: dict[int, set[int]], odd_flow: dict[int, set[int]], ) -> dict[int, set[int]]: """Get dependence flow from flow. Parameters ---------- inputs: set[int] set of input nodes flow: dict[int, set] flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. odd_flow: dict[int, set] odd neighbors of flow or gflow. odd_flow[i] is the set of odd neighbors of f(i), Odd(f(i)). Returns ------- dependence_flow: dict[int, set] dependence flow function. dependence_flow[i] is the set of qubits to be corrected for the measurement of qubit i. """ dependence_flow = {u: set() for u in inputs} # concatenate flow and odd_flow combined_flow = {} for node, corrections in flow.items(): combined_flow[node] = corrections | odd_flow[node] for node, corrections in combined_flow.items(): for correction in corrections: if correction not in dependence_flow: dependence_flow[correction] = set() dependence_flow[correction] |= {node} return dependence_flow def get_dependence_pauliflow( inputs: set[int], flow: dict[int, set[int]], odd_flow: dict[int, set[int]], ls: tuple[set[int], set[int], set[int]], ): """Get dependence flow from Pauli flow. Parameters ---------- inputs: set[int] set of input nodes flow: dict[int, set[int]] Pauli flow function. p[i] is the set of qubits to be corrected for the measurement of qubit i. odd_flow: dict[int, set[int]] odd neighbors of Pauli flow or gflow. Odd(p(i)) ls: tuple ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. Returns ------- dependence_pauliflow: dict[int, set[int]] dependence flow function. dependence_pauliflow[i] is the set of qubits to be corrected for the measurement of qubit i. """ l_x, l_y, l_z = ls dependence_pauliflow = {u: set() for u in inputs} # concatenate p and odd_p combined_flow = {} for node, corrections in flow.items(): combined_flow[node] = (corrections - (l_x | l_y)) | (odd_flow[node] - (l_y | l_z)) for ynode in l_y: if ynode in corrections.symmetric_difference(odd_flow[node]): combined_flow[node] |= {ynode} for node, corrections in combined_flow.items(): for correction in corrections: if correction not in dependence_pauliflow: dependence_pauliflow[correction] = set() dependence_pauliflow[correction] |= {node} return dependence_pauliflow def get_layers_from_flow( flow: dict[int, set], odd_flow: dict[int, set], inputs: set[int], outputs: set[int], ls: tuple[set[int], set[int], set[int]] | None = None, ) -> tuple[dict[int, set], int]: """Get layers from flow (incl. gflow, Pauli flow). Parameters ---------- flow: dict[int, set] flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. odd_flow: dict[int, set] odd neighbors of flow or gflow. Odd(f(node)) inputs: set set of input nodes outputs: set set of output nodes ls: tuple ls = (l_x, l_y, l_z) where l_x, l_y, l_z are sets of qubits whose measurement operators are X, Y, Z, respectively. If not None, the layers are obtained based on Pauli flow. Returns ------- layers: dict[int, set] layers obtained from flow depth: int depth of the layers Raises ------ ValueError If the flow is not valid(e.g. there is no partial order). """ layers = {} depth = 0 if ls is None: dependence_flow = get_dependence_flow(inputs, odd_flow, flow) else: dependence_flow = get_dependence_pauliflow(inputs, flow, odd_flow, ls) left_nodes = set(flow.keys()) for output in outputs: if output in left_nodes: raise ValueError("Invalid flow") while True: layers[depth] = set() for node in left_nodes: if node not in dependence_flow or len(dependence_flow[node]) == 0 or dependence_flow[node] == {node}: layers[depth] |= {node} left_nodes -= layers[depth] for node in left_nodes: dependence_flow[node] -= layers[depth] if len(layers[depth]) == 0: if len(left_nodes) == 0: layers[depth] = outputs depth += 1 break raise ValueError("Invalid flow") depth += 1 return layers, depth def get_adjacency_matrix(graph: nx.Graph) -> tuple[MatGF2, list[int]]: """Get adjacency matrix of the graph. Parameters ---------- graph : :class:`networkx.Graph` Graph whose adjacency matrix is computed. Returns ------- adjacency_matrix: graphix.linalg.MatGF2 adjacency matrix of the graph. the matrix is defined on GF(2) field. node_list: list ordered list of nodes. ``node_list[i]`` is the node label of the i-th row/column of the adjacency matrix. """ node_list = list(graph.nodes) node_list.sort() adjacency_matrix = nx.to_numpy_array(graph, nodelist=node_list) adjacency_matrix = MatGF2(adjacency_matrix.astype(int)) return adjacency_matrix, node_list
[docs] def verify_flow( graph: nx.Graph, iset: set[int], oset: set[int], flow: dict[int, set], meas_planes: dict[int, Plane] | None = None, ) -> bool: """Check whether the flow is valid. Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) flow: dict[int, set] flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. meas_planes: dict[int, str] optional: measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. Returns ------- valid_flow: bool True if the flow is valid. False otherwise. """ if meas_planes is None: meas_planes = {} check_meas_planes(meas_planes) valid_flow = True non_outputs = set(graph.nodes) - oset # if meas_planes is given, check whether all measurement planes are "XY" for node, plane in meas_planes.items(): if plane != Plane.XY or node not in non_outputs: return False odd_flow = {node: find_odd_neighbor(graph, corrections) for node, corrections in flow.items()} try: _, _ = get_layers_from_flow(flow, odd_flow, iset, oset) except ValueError: return False # check if v ~ f(v) for each node edges = set(graph.edges) for node, corrections in flow.items(): if len(corrections) > 1: return False correction = next(iter(corrections)) if (node, correction) not in edges and (correction, node) not in edges: return False return valid_flow
[docs] def verify_gflow( graph: nx.Graph, iset: set[int], oset: set[int], gflow: dict[int, set], meas_planes: dict[int, Plane], ) -> bool: """Check whether the gflow is valid. Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output gflow: dict[int, set] gflow function. gflow[i] is the set of qubits to be corrected for the measurement of qubit i. .. seealso:: :func:`find_gflow` meas_planes: dict[int, str] measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. Returns ------- valid_gflow: bool True if the gflow is valid. False otherwise. """ check_meas_planes(meas_planes) valid_gflow = True non_outputs = set(graph.nodes) - oset odd_flow = {} for non_output in non_outputs: if non_output not in gflow: gflow[non_output] = set() odd_flow[non_output] = set() else: odd_flow[non_output] = find_odd_neighbor(graph, gflow[non_output]) try: _, _ = get_layers_from_flow(gflow, odd_flow, iset, oset) except ValueError: return False # check for each measurement plane for node, plane in meas_planes.items(): # index = node_order.index(node) if plane == Plane.XY: valid_gflow &= (node not in gflow[node]) and (node in odd_flow[node]) elif plane == Plane.XZ: valid_gflow &= (node in gflow[node]) and (node in odd_flow[node]) elif plane == Plane.YZ: valid_gflow &= (node in gflow[node]) and (node not in odd_flow[node]) return valid_gflow
[docs] def verify_pauliflow( graph: nx.Graph, iset: set[int], oset: set[int], pauliflow: dict[int, set[int]], meas_planes: dict[int, Plane], meas_angles: dict[int, float], ) -> bool: """Check whether the Pauliflow is valid. Parameters ---------- graph: :class:`networkx.Graph` Graph (incl. input and output) iset: set set of node labels for input oset: set set of node labels for output pauliflow: dict[int, set] Pauli flow function. pauliflow[i] is the set of qubits to be corrected for the measurement of qubit i. meas_planes: dict[int, Plane] measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i. meas_angles: dict[int, float] measurement angles for each qubits. meas_angles[i] is the measurement angle for qubit i. Returns ------- valid_pauliflow: bool True if the Pauliflow is valid. False otherwise. """ check_meas_planes(meas_planes) l_x, l_y, l_z = get_pauli_nodes(meas_planes, meas_angles) valid_pauliflow = True non_outputs = set(graph.nodes) - oset odd_flow = {} for non_output in non_outputs: if non_output not in pauliflow: pauliflow[non_output] = set() odd_flow[non_output] = set() else: odd_flow[non_output] = find_odd_neighbor(graph, pauliflow[non_output]) try: layers, depth = get_layers_from_flow(pauliflow, odd_flow, iset, oset, (l_x, l_y, l_z)) except ValueError: return False node_order = [] for d in range(depth): node_order.extend(list(layers[d])) for node, plane in meas_planes.items(): if node in l_x: valid_pauliflow &= node in odd_flow[node] elif node in l_z: valid_pauliflow &= node in pauliflow[node] elif node in l_y: valid_pauliflow &= node in pauliflow[node].symmetric_difference(odd_flow[node]) elif plane == Plane.XY: valid_pauliflow &= (node not in pauliflow[node]) and (node in odd_flow[node]) elif plane == Plane.XZ: valid_pauliflow &= (node in pauliflow[node]) and (node in odd_flow[node]) elif plane == Plane.YZ: valid_pauliflow &= (node in pauliflow[node]) and (node not in odd_flow[node]) return valid_pauliflow
def get_input_from_flow(flow: dict[int, set]) -> set: """Get input nodes from flow. Parameters ---------- flow: dict[int, set] flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. Returns ------- inputs: set set of input nodes """ non_output = set(flow.keys()) for correction in flow.values(): non_output -= correction return non_output def get_output_from_flow(flow: dict[int, set]) -> set: """Get output nodes from flow. Parameters ---------- flow: dict[int, set] flow function. flow[i] is the set of qubits to be corrected for the measurement of qubit i. Returns ------- outputs: set set of output nodes """ non_outputs = set(flow.keys()) non_inputs = set() for correction in flow.values(): non_inputs |= correction return non_inputs - non_outputs def get_pauli_nodes( meas_planes: dict[int, Plane], meas_angles: Mapping[int, ExpressionOrFloat] ) -> tuple[set[int], set[int], set[int]]: """Get sets of nodes measured in X, Y, Z basis. Parameters ---------- meas_planes: dict[int, Plane] measurement planes for each node. meas_angles: dict[int, float] measurement angles for each node. Returns ------- l_x: set set of nodes measured in X basis. l_y: set set of nodes measured in Y basis. l_z: set set of nodes measured in Z basis. """ check_meas_planes(meas_planes) l_x, l_y, l_z = set(), set(), set() for node, plane in meas_planes.items(): pm = PauliMeasurement.try_from(plane, meas_angles[node]) if pm is None: continue if pm.axis == Axis.X: l_x |= {node} elif pm.axis == Axis.Y: l_y |= {node} elif pm.axis == Axis.Z: l_z |= {node} else: assert_never(pm.axis) return l_x, l_y, l_z