"""flow finding algorithm
For a given underlying graph (G, I, O, meas_plane), this method finds a (generalized) flow [NJP 9, 250 (2007)]
in polynomincal time.
In particular, this outputs gflow with minimum depth, maximally delayed gflow.
Ref: Mhalla and Perdrix, International Colloquium on Automata,
Languages, and Programming (Springer, 2008), pp. 857-868.
Ref: Backens et al., Quantum 5, 421 (2021).
"""
from itertools import product
import networkx as nx
import numpy as np
import sympy as sp
from graphix.linalg import MatGF2
[docs]def gflow(graph, input, output, meas_planes, mode="single"):
"""Maximally delayed gflow finding algorithm
For open graph g with input, output, and measurement planes, this returns maximally delayed gflow.
gflow consist of function g(i) where i is the qubit labels,
and strict partial ordering < or layers labels l_k where each element
specify the order of qubits to be measured to maintain determinism in MBQC.
In practice, we must measure qubits in order specified in array l_k (increasing order
of l_k from 1), and for each measurements of qubit i we must perform corrections on
qubits in g(i), depending on the measurement outcome.
For more details of gflow, see Browne et al., NJP 9, 250 (2007).
We use the extended gflow finding algorithm in Backens et al., Quantum 5, 421 (2021).
Parameters
----------
graph: nx.Graph
graph (incl. in and out)
input: set
set of node labels for input
output: set
set of node labels for output
meas_planes: dict
measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i.
mode: str(optional)
The gflow finding algorithm can yield multiple equivalent solutions. So there are three options
- "single": Returrns a single solution
- "all": Returns all possible solutions
- "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects,
requiring user substitution to get a concrete answer.
Default is "single".
Returns
-------
g: dict
gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i.
l_k: dict
layers obtained by gflow algorithm. l_k[d] is a node set of depth d.
"""
l_k = dict()
g = dict()
for node in graph.nodes:
l_k[node] = 0
return gflowaux(graph, input, output, meas_planes, 1, l_k, g, mode=mode)
def gflowaux(
graph,
input: set,
output: set,
meas_planes: dict,
k: int,
l_k: dict,
g: dict,
mode,
):
"""Function to find one layer of the gflow.
Ref: Backens et al., Quantum 5, 421 (2021).
Parameters
----------
graph: nx.Graph
graph (incl. in and out)
input: set
set of node labels for input
output: set
set of node labels for output
meas_planes: dict
measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i.
k: int
current layer number.
l_k: dict
layers obtained by gflow algorithm. l_k[d] is a node set of depth d.
g: dict
gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i.
mode: str(optional)
The gflow finding algorithm can yield multiple equivalent solutions. So there are three options
- "single": Returrns a single solution
- "all": Returns all possible solutions
- "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects,
requiring user substitution to get a concrete answer.
Returns
-------
g: dict
gflow function. g[i] is the set of qubits to be corrected for the measurement of qubit i.
l_k: dict
layers obtained by gflow algorithm. l_k[d] is a node set of depth d.
"""
nodes = set(graph.nodes)
if output == nodes:
return g, l_k
non_output = nodes - output
correction_candidate = output - input
adj_mat, node_order_list = get_adjacency_matrix(graph)
node_order_row = node_order_list.copy()
node_order_row.sort()
node_order_col = node_order_list.copy()
node_order_col.sort()
for out in output:
adj_mat.remove_row(node_order_row.index(out))
node_order_row.remove(out)
adj_mat_row_reduced = adj_mat.copy() # later use for construct RHS
for node in nodes - correction_candidate:
adj_mat.remove_col(node_order_col.index(node))
node_order_col.remove(node)
b = MatGF2(np.zeros((adj_mat.data.shape[0], len(non_output)), dtype=int))
for i_row in range(len(node_order_row)):
node = node_order_row[i_row]
vec = MatGF2(np.zeros(len(node_order_row), dtype=int))
if meas_planes[node] == "XY":
vec.data[i_row] = 1
elif meas_planes[node] == "XZ":
vec.data[i_row] = 1
vec_add = adj_mat_row_reduced.data[:, node_order_list.index(node)]
vec = vec + vec_add
elif meas_planes[node] == "YZ":
vec.data = adj_mat_row_reduced.data[:, i_row].reshape(vec.data.shape)
b.data[:, i_row] = vec.data
adj_mat, b, _, col_pertumutation = adj_mat.forward_eliminate(b)
x, kernels = adj_mat.backward_substitute(b)
corrected_nodes = set()
for i_row in range(len(node_order_row)):
non_out_node = node_order_row[i_row]
x_col = x[:, i_row]
if x_col[0] == sp.nan: # no solution
continue
if mode == "single":
sol_list = [x_col[i].subs(zip(kernels, [sp.false] * len(kernels))) for i in range(len(x_col))]
sol = np.array(sol_list)
sol_index = sol.nonzero()[0]
g[non_out_node] = set(node_order_col[col_pertumutation[i]] for i in sol_index)
if meas_planes[non_out_node] in ["XZ", "YZ"]:
g[non_out_node] |= {non_out_node}
elif mode == "all":
g[non_out_node] = set()
binary_combinations = product([0, 1], repeat=len(kernels))
for binary_combination in binary_combinations:
sol_list = [x_col[i].subs(zip(kernels, binary_combination)) for i in range(len(x_col))]
kernel_list = [True if i == 1 else False for i in binary_combination]
sol_list.extend(kernel_list)
sol = np.array(sol_list)
sol_index = sol.nonzero()[0]
g_i = set(node_order_col[col_pertumutation[i]] for i in sol_index)
if meas_planes[non_out_node] in ["XZ", "YZ"]:
g_i |= {non_out_node}
g[non_out_node] |= {frozenset(g_i)}
elif mode == "abstract":
g[non_out_node] = dict()
for i in range(len(x_col)):
node = node_order_col[col_pertumutation[i]]
g[non_out_node][node] = x_col[i]
for i in range(len(kernels)):
g[non_out_node][node_order_col[col_pertumutation[len(x_col) + i]]] = kernels[i]
if meas_planes[non_out_node] in ["XZ", "YZ"]:
g[non_out_node][non_out_node] = sp.true
l_k[non_out_node] = k
corrected_nodes |= {non_out_node}
if len(corrected_nodes) == 0:
if output == nodes:
return g, l_k
else:
return None, None
else:
return gflowaux(
graph,
input,
output | corrected_nodes,
meas_planes,
k + 1,
l_k,
g,
mode=mode,
)
[docs]def flow(graph, input, output, meas_planes=None):
"""Causal flow finding algorithm
For open graph g with input, output, and measurement planes, this returns causal flow.
For more detail of causal flow, see Danos and Kashefi, PRA 74, 052310 (2006).
Original algorithm by Mhalla and Perdrix,
International Colloquium on Automata, Languages, and Programming (2008),
pp. 857-868.
Parameters
----------
graph: nx.Graph
graph (incl. in and out)
input: set
set of node labels for input
output: set
set of node labels for output
meas_planes: int(optional)
measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i.
Note that an underlying graph has a causal flow only if all measurement planes are "XY".
If not specified, all measurement planes are interpreted as "XY".
Returns
-------
f: list of nodes
causal flow function. f[i] is the qubit to be measured after qubit i.
l_k: dict
layers obtained by gflow algorithm. l_k[d] is a node set of depth d.
"""
nodes = set(graph.nodes)
edges = set(graph.edges)
if meas_planes is None:
meas_planes = {i: "XY" for i in (nodes - output)}
for plane in meas_planes.values():
if plane not in ["X", "Y", "XY"]:
return None, None
l_k = {i: 0 for i in nodes}
f = dict()
k = 1
v_c = output - input
return flowaux(nodes, edges, input, output, v_c, f, l_k, k)
def flowaux(nodes, edges, input, output, v_c, f, l_k, k):
"""Function to find one layer of the flow.
Ref: Mhalla and Perdrix, International Colloquium on Automata,
Languages, and Programming (Springer, 2008), pp. 857-868.
Parameters
----------
nodes: set
labels of all qubits (nodes)
edges: set
edges
input: set
set of node labels for input
output: set
set of node labels for output
v_c: set
correction candidate qubits
f: dict
flow function. f[i] is the qubit to be measured after qubit i.
l_k: dict
layers obtained by flow algorithm. l_k[d] is a node set of depth d.
k: int
current layer number.
meas_planes: dict
measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i.
Outputs
-------
f: list of nodes
causal flow function. f[i] is the qubit to be measured after qubit i.
l_k: dict
layers obtained by gflow algorithm. l_k[d] is a node set of depth d.
"""
v_out_prime = set()
c_prime = set()
for q in v_c:
N = search_neighbor(q, edges)
p_set = N & (nodes - output)
if len(p_set) == 1:
p = list(p_set)[0]
f[p] = q
l_k[p] = k
v_out_prime = v_out_prime | {p}
c_prime = c_prime | {q}
# determine whether there exists flow
if not v_out_prime:
if output == nodes:
return f, l_k
else:
return None, None
return flowaux(
nodes,
edges,
input,
output | v_out_prime,
(v_c - c_prime) | (v_out_prime & (nodes - input)),
f,
l_k,
k + 1,
)
def search_neighbor(node, edges):
"""Function to find neighborhood of node in edges. This is an ancillary method for `flowaux()`.
Parameter
-------
node: int
target node number whose neighboring nodes will be collected
edges: list of taples
set of edges in the graph
Outputs
------
N: list of ints
neighboring nodes
"""
N = set()
for edge in edges:
if node == edge[0]:
N = N | {edge[1]}
elif node == edge[1]:
N = N | {edge[0]}
return N
def find_flow(graph, input, output, meas_planes=None, mode="single"):
"""Function to determine whether there exists flow or gflow
Parameters
---------
graph: nx.Graph
graph (incl. in and out)
input: set
set of node labels for input
output: set
set of node labels for output
meas_planes: dict(optional)
measurement planes for each qubits. meas_planes[i] is the measurement plane for qubit i.
mode: str(optional)
This is the option for gflow.
The gflow finding algorithm can yield multiple equivalent solutions. so there are three options
- "single": Returrns a single solution
- "all": Returns all possible solutions
- "abstract": Returns an abstract solution. Uncertainty is represented with sympy.Symbol objects,
requiring user substitution to get a concrete answer.
"""
if meas_planes is None:
meas_planes = {i: "XY" for i in (set(graph.nodes) - output)}
f, l_k = flow(graph, input, output, meas_planes)
if f:
print("flow found")
print("f is ", f)
print("l_k is ", l_k)
else:
print("no flow found, finding gflow")
g, l_k = gflow(graph, input, output, meas_planes, mode=mode)
if g:
print("gflow found")
print("g is ", g)
print("l_k is ", l_k)
else:
print("no gflow found")
def get_min_depth(l_k):
"""get minimum depth of graph.
Parameters
----------
l_k: dict
layers obtained by flow or gflow
Returns
-------
d: int
minimum depth of graph
"""
return max(l_k.values())
def find_odd_neighbor(graph, candidate, vertices):
"""Returns the list containing the odd neighbor of a set of vertices.
Parameters
----------
graph : networkx.Graph
underlying graph.
candidate : iterable
possible odd neighbors
vertices : set
set of nodes indices to find odd neighbors
Returns
-------
out : list
list of indices for odd neighbor of set `vertices`.
"""
out = []
for c in candidate:
if np.mod(len(set(graph.neighbor(c)) ^ vertices), 2) == 1:
out.append(c)
return out
def get_layers(l_k):
"""get components of each layer.
Parameters
-------
l_k: dict
layers obtained by flow or gflow algorithms
Returns
-------
d: int
minimum depth of graph
layers: dict of lists
components of each layer
"""
d = get_min_depth(l_k)
layers = {k: [] for k in range(d + 1)}
for i in l_k.keys():
layers[l_k[i]].append(i)
return d, layers
def get_adjacency_matrix(graph):
"""Get adjacency matrix of the graph
Returns
-------
adjacency_matrix: graphix.linalg.MatGF2
adjacency matrix of the graph. the matrix is defined on GF(2) field.
node_list: list
ordered list of nodes. node_list[i] is the node label of i-th row/column of the adjacency matrix.
"""
node_list = list(graph.nodes)
node_list.sort()
adjacency_matrix = nx.to_numpy_array(graph, nodelist=node_list)
adjacency_matrix = MatGF2(adjacency_matrix.astype(int))
return adjacency_matrix, node_list