Source code for graphix.sim.mps

import numpy as np
import tensornetwork as tn
import networkx as nx
from graphix.clifford import CLIFFORD
from graphix.sim.statevec import meas_op
from copy import copy


[docs]class MPS: """Matrix Product Simulator for MBQC Executes the measurement pattern. This is a simple implementation. Improved CPU/GPU MPS simulator will be released soon. """
[docs] def __init__(self, pattern, singular_value=None, max_truncation_err=None, graph_prep="opt"): """ Parameters ---------- pattern : graphix.Pattern MBQC command sequence to be simulated singular_value: int cut off threshold for SVD decomposition truncation_err: float cut off threshold for SVD decomposition. truncate maximum number of singular values within truncation_err graph_prep : str 'sequential' for standard method, 'opt' for faster method """ nodes, edges = pattern.get_graph() G = nx.Graph() G.add_nodes_from(nodes) G.add_edges_from(edges) self.ptn = pattern self.results = copy(pattern.results) self.graph = G # dict of tensornetwork.Node, index should be int self.nodes = dict() self.state = None self.singular_value = singular_value self.truncation_err = max_truncation_err self.graph_prep = graph_prep # accumulated truncation square error self.accumulated_err = 0.0
[docs] def set_singular_value(self, chi): """Set the number of singular values holding under singular value decomposition(SVD). Note: If you choose 'opt' as a graph_prep, you don't have to specify the singular_value. Parameters ---------- chi(int): number of singular values holding when SVD executed """ self.singular_value = chi
[docs] def set_truncation_err(self, truncation_err): """Set the max truncation error under SVD. Note: If you choose 'opt' as a graph_prep, you don't have to specify the truncation_err. Parameters ---------- truncation_err (float): Max truncation error allowed under SVD. Maximum number of singular values keeping truncation error under specified value will be possessed. """ self.truncation_err = truncation_err
[docs] def count_maxE(self): """Count the max number of edges per a node. When maxE is large number, huge memory(2^maxE) will be used. Returns ------- int: The max number of edges. """ maxE = 0 for node in self.graph.nodes: n = len(self.graph[node]) if n > maxE: maxE = n return maxE
[docs] def add_nodes(self, cmds): """add qubits to node sets from command sequence""" if self.graph_prep == "sequential": for cmd in cmds: self.add_node(cmd[1]) elif self.graph_prep == "opt": pass
[docs] def add_node(self, n): """Internal method of run_cmd(). Add new qubit to a node set of MPS. Parameters ---------- n (int): Site index of the new node. """ neighbor = self.graph.neighbors(n) dim = [2] axis_names = [str(n)] for neighbor_node in neighbor: dim.append(1) axis_names.append(str(neighbor_node)) node = tn.Node(np.ones(dim), str(n), axis_names) self.nodes[n] = node
[docs] def entangle_nodes(self, edge): """Make entanglement between nodes specified by edge. Contract non-dangling edges in this process. Optimized contraction will be implemented in a later release. Parameters ---------- edge (taple of ints): edge specifies two nodes applied CZ gate. """ if self.graph_prep == "sequential": # prepare CZ operator cz = tn.Node( np.array( [ [[[1.0, 0.0], [0.0, 0.0]], [[0.0, 1.0], [0.0, 0.0]]], [[[0.0, 0.0], [1.0, 0.0]], [[0.0, 0.0], [0.0, -1.0]]], ] ), name="cz", axis_names=["c1", "c2", "c3", "c4"], ) # call nodes from nodes list node1 = self.nodes[edge[0]] node2 = self.nodes[edge[1]] # contract the edge between nodes cont_edge = node1[str(edge[1])] ^ node2[str(edge[0])] axis_name1 = copy(node1.axis_names) axis_name2 = copy(node2.axis_names) axis_name1.remove(str(edge[1])) axis_name2.remove(str(edge[0])) connected = tn.contract(cont_edge, name="connected", axis_names=axis_name1 + axis_name2) connected[str(edge[0])] ^ cz["c1"] connected[str(edge[1])] ^ cz["c2"] connected_rem = copy(connected.edges) connected_rem.remove(connected[str(edge[0])]) connected_rem.remove(connected[str(edge[1])]) connected_axis_names = copy(connected.axis_names) connected_axis_names.remove(str(edge[0])) connected_axis_names.remove(str(edge[1])) # apply entangle operatorn and rename edges entangled = tn.contract_between( connected, cz, name="entangled", output_edge_order=[cz["c3"], cz["c4"]] + connected_rem, axis_names=[str(edge[0]), str(edge[1])] + connected_axis_names, ) leftedges = [] for name in axis_name1: leftedges.append(entangled[name]) rightedges = [] for name in axis_name2: rightedges.append(entangled[name]) # separate 4rank tensor to two 3rank tensors node1_new, node2_new, truncation_errs = tn.split_node( entangled, left_edges=leftedges, right_edges=rightedges, max_singular_values=self.singular_value, max_truncation_err=self.truncation_err, left_name=node1.name, right_name=node2.name, edge_name="contracted", ) # update the nodes node1_new.axis_names = axis_name1 + [str(edge[1])] node2_new.axis_names = [str(edge[0])] + axis_name2 self.nodes[edge[0]] = node1_new self.nodes[edge[1]] = node2_new self.accumulated_err += np.linalg.norm(truncation_errs) assert node1_new[node1.name].is_dangling() assert node2_new[node2.name].is_dangling() elif self.graph_prep == "opt": pass
[docs] def initialize(self): """initialize the internal MPS state""" if self.graph_prep == "sequential": self.make_initial() elif self.graph_prep == "opt": self.make_graph_state()
[docs] def make_initial(self): """This is an internal method of run_cmd. Prepare a graph state sequentially by applying CZ gates. """ # prepare nodes for n in self.graph.nodes: self.add_node(n) # make entanglement for edge in self.graph.edges: self.entangle_nodes(edge)
def finalize(self): self.state = self
[docs] def make_graph_state(self): """Prepare a graph state with efficient expression, instead of using CZ gates. This is is an internal method of run_cmd(). .. seealso:: :meth:`~graphix.sim.mps.make_initial()` """ # basic vectors plus = np.array([1.0 / np.sqrt(2), 1.0 / np.sqrt(2)]) minus = np.array([1.0 / np.sqrt(2), -1.0 / np.sqrt(2)]) zero = np.array([1.0, 0.0]) one = np.array([0.0, 1.0]) for site in self.graph.nodes: neighbor = self.graph.neighbors(site) A0 = [] A1 = [] axis_names = [str(site)] edge_order0 = [] edge_order1 = [] for n in neighbor: if n > site: zero_node_cp = tn.Node(zero) one_node_cp = tn.Node(one) A0.append(zero_node_cp) A1.append(one_node_cp) edge_order0.append(zero_node_cp[0]) edge_order1.append(one_node_cp[0]) elif n < site: plus_node_cp = tn.Node(plus) minus_node_cp = tn.Node(minus) A0.append(plus_node_cp) A1.append(minus_node_cp) edge_order0.append(plus_node_cp[0]) edge_order1.append(minus_node_cp[0]) axis_names.append(str(n)) if len(A0) * len(A1): tensor = np.array( [ tn.outer_product_final_nodes(A0, edge_order0).tensor, tn.outer_product_final_nodes(A1, edge_order1).tensor, ] ) else: # branch for not concatenated graph tensor = np.array([1.0 / np.sqrt(2), 1.0 / np.sqrt(2)]) node = tn.Node(tensor, str(site), axis_names=axis_names) self.nodes[site] = node # connecting all edges for edge in self.graph.edges: node1 = self.nodes[edge[0]] node2 = self.nodes[edge[1]] node1[str(edge[1])] ^ node2[str(edge[0])]
[docs] def measure(self, cmd): """Perform measurement of a node. In MPS, to apply measurement operator to the tensor, consisting Matrix Product State, equals to perform measurement. Parameters ---------- cmd (list): measurement command : ['M', node, plane angle, s_domain, t_domain] """ # choose the measurement result randomly result = np.random.choice([0, 1]) self.results[cmd[1]] = result # extract signals for adaptive angle s_signal = np.sum([self.results[j] for j in cmd[4]]) t_signal = np.sum([self.results[j] for j in cmd[5]]) angle = cmd[3] * np.pi * (-1) ** s_signal + np.pi * t_signal if len(cmd) == 7: m_op = meas_op(angle, vop=cmd[6], plane=cmd[2], choice=result) else: m_op = meas_op(angle, plane=cmd[2], choice=result) # the procedure described below tends to keep the norm of MPS buffer = 2**0.5 m_op = m_op * buffer node_op = tn.Node(m_op) self.apply_one_site_operator(cmd[1], node_op)
[docs] def correct_byproduct(self, cmd): """Perform byproduct correction. Parameters ---------- cmd (list): correct for the X or Z byproduct operators, by applying the X or Z gate. """ if np.mod(np.sum([self.results[j] for j in cmd[2]]), 2) == 1: if cmd[0] == "X": op = np.array([[0.0, 1.0], [1.0, 0.0]]) elif cmd[0] == "Z": op = np.array([[1.0, 0.0], [0.0, -1.0]]) node_op = tn.Node(op) self.apply_one_site_operator(cmd[1], node_op)
[docs] def apply_clifford(self, cmd): """Apply single-qubit Clifford gate, specified by vop index specified in graphix.clifford.CLIFFORD Parameters ---------- cmd (list): clifford command. See {Ref} graphix-paper for the detail. ToDo """ node_op = tn.Node(CLIFFORD[cmd[2]]) self.apply_one_site_operator(cmd[1], node_op)
[docs] def apply_one_site_operator(self, loc, node_op): """Internal method for measure, correct_byproduct, and apply_clifford. Apply one site operator to a node. Parameters ---------- loc (int): site number. node_op (tn.Node): one site operator. """ node = self.nodes[loc] node[str(loc)] ^ node_op[1] edges = copy(node.edges) edges.remove(node[str(loc)]) axis_names = copy(node.axis_names) axis_names.remove(str(loc)) applied = tn.contract_between( node, node_op, name=node.name, output_edge_order=[node_op[0]] + edges, axis_names=[str(loc)] + axis_names ) self.nodes[loc] = applied
[docs] def replicate_node_dict(self, node_dict, conjugate=False): """Replicate dictionary of nodes. Parameters ---------- node_dict (dic of tensornetwork.Node): copying dictionary of nodes(e.g. self.nodes) Returns ------- dict of tensornetwork.Node: replicated node dictionary """ rep_list = tn.replicate_nodes(node_dict.values(), conjugate=conjugate) rep_dict = {node.name: node for node in rep_list} return rep_dict
[docs] def expectation_value(self, op, qargs): """calculate expectation value of given operator. Parameters ---------- op (numpy.ndarray): Expectation value is calculated based on op. qargs (list of ints): applied positions of logical qubits. Returns ------- expectation_value : float """ state = self.replicate_node_dict(self.nodes) dim = int(np.log2(len(op))) shape = [2 for _ in range(2 * dim)] sites = [self.ptn.output_nodes[i] for i in qargs] axis_names_in = ["in" + str(site) for site in sites] axis_names_out = [str(site) for site in sites] axis_names = axis_names_out + axis_names_in node_op = tn.Node(op.reshape(shape), axis_names=axis_names) # replicate nodes for calculating expectation value rep = self.replicate_node_dict(self.nodes, conjugate=True) rep_norm = self.replicate_node_dict(self.nodes) rep_norm2 = self.replicate_node_dict(self.nodes, conjugate=True) # connect given operators to sites concatenated_nodes = set() for site in sites: concatenated_nodes |= nx.shortest_path(self.graph, site).keys() contraction_set = set() for site in concatenated_nodes: if site in sites: node_op["in" + str(site)] ^ state[str(site)][str(site)] node_op[str(site)] ^ rep[str(site)][str(site)] else: state[str(site)][str(site)] ^ rep[str(site)][str(site)] contraction_set |= {state[str(site)]} | {rep[str(site)]} expectation_value = tn.contractors.auto(list(contraction_set) + [node_op]).tensor # calculate norm of TN norm_contraction_list = [] for site in concatenated_nodes: rep_norm[str(site)][str(site)] ^ rep_norm2[str(site)][str(site)] norm_contraction_list += [rep_norm[str(site)], rep_norm2[str(site)]] norm = tn.contractors.auto(norm_contraction_list).tensor expectation_value = expectation_value / norm return expectation_value
[docs] def expectation_value_ops(self, ops, qargs): """calculate expectation value of given operators. This command is mainly used for retrieving a probability distribution. Parameters ---------- ops (list of numpy.ndarray): Expectation value is calculated based on ops. For constructing statevector, ops are projection operators fro each site. qargs (list of ints): applied positions of logical qubits. Returns ------- expectation value : float """ state = self.replicate_node_dict(self.nodes) sites = [self.ptn.output_nodes[i] for i in qargs] node_ops = dict() for i in range(len(sites)): node_op = tn.Node(ops[i], axis_names=["in" + str(sites[i])] + [str(sites[i])]) node_ops[sites[i]] = node_op # replicate nodes for calculating expectation value rep = self.replicate_node_dict(self.nodes, conjugate=True) rep_norm = self.replicate_node_dict(self.nodes) rep_norm2 = self.replicate_node_dict(self.nodes, conjugate=True) # connecting given op to sites concatenated_nodes = set() for site in sites: concatenated_nodes |= nx.shortest_path(self.graph, site).keys() contraction_set = set() for site in concatenated_nodes: if site in sites: state[str(site)][str(site)] ^ node_ops[site]["in" + str(site)] node_ops[site][str(site)] ^ rep[str(site)][str(site)] else: state[str(site)][str(site)] ^ rep[str(site)][str(site)] contraction_set |= {state[str(site)]} | {rep[str(site)]} expectation_value = tn.contractors.auto(list(contraction_set) + list(node_ops.values())).tensor # calculate norm of TN norm_contraction_list = [] for site in concatenated_nodes: rep_norm[str(site)][str(site)] ^ rep_norm2[str(site)][str(site)] norm_contraction_list += [rep_norm[str(site)], rep_norm2[str(site)]] norm = tn.contractors.auto(norm_contraction_list).tensor expectation_value = expectation_value / norm return expectation_value
[docs] def get_amplitude(self, number): """calculate a probability amplitude of the specified state. Parameters ---------- number (int): specifies a state which one wants to know a probability amplitude e.g. |0000> corresponds to 0. |1010> corresponds to 10. Returns ------- float: the probability amplitude of the specified state. """ proj_to_0 = np.array([[1.0, 0.0], [0.0, 0.0]]) proj_to_1 = np.array([[0.0, 0.0], [0.0, 1.0]]) sites = self.ptn.output_nodes assert number < 2 ** len(sites) ops = [] for i in range(len(sites)): exp = len(sites) - 1 - i if (number // 2**exp) == 1: op = proj_to_1 number -= 2**exp else: op = proj_to_0 ops.append(op) qargs = range(len(sites)) probability = self.expectation_value_ops(ops, qargs) return abs(probability)