Non-existent parameter in the split_branch() function of the short-circuit driver
Bug description The function split_branch() used for calculations of short-circuits at a predefined distance from the line attempts to access the conductance property of the branch (G), which doesn't exist for lines. This property is not used later in the function. In addition, the function attempts to modify bus.Zf property which doesn't exist.
To reproduce import VeraGridEngine as vge import os
folder = os.path.join('..', 'Grids_and_profiles', 'grids') fname = os.path.join(folder, 'South Island of New Zealand.veragrid')
network = vge.open_file(filename=fname)
pf_options = vge.PowerFlowOptions() pf = vge.PowerFlowDriver(network, pf_options) pf.run()
sc_options = vge.ShortCircuitOptions(mid_line_fault=True, branch_index=1, branch_fault_locations=0.5, fault_type=vge.FaultType.LG)
sc = vge.ShortCircuitDriver(network, options=sc_options, pf_options=pf_options, pf_results=pf.results) vge.run()
Fix Line 108 (g = branch.G) should be removed from the split_branch() function in the ShortCircuitDriver class. Line 123 (middle_bus.Zf = complex(r_fault, x_fault)) should be changed to (middle_bus.r_fault = r_fault and middle_bus.x_fault = x_fault).
@alexblancoeroots Could you look into this?
@SanPen, @alexblancoeroots there are additional issues with the short-circuit routine for mid-line faults. When a dummy bus is added, the number of buses increases by 1, but the self.pf_results.voltage vector keeps the same size as before the dummy bus. This leads to an error when single_short_circuit() function is called.
yes, I was thinking that is the reason why that function was left a bit outdated. The way would be to split the line in the schematics and then select the new DB bus for short circuit.
Yeah, that's the approach I used through the VeraGridEngine. It could make sense to send the pf_results to the split_branch() function and append the voltage of the dummy bus.
I think the proper solution would be to do the following:
- Compile the
MultiCircuitto aNumericalCircuit(nc) - modify
ncto have 1 more bus, and 2 more branches - the data for the new bus should be the same as the data for the bus from of the failed branch.
- deactivate the failed branch
- the data of the new branches should be proportional to the deactivated branch as per the fault location
- run a power flow with
nc - run the short circuit with
nc
All of the above x3 for the ABC short circuits.
I implemented a somewhat different approach. Instead of modifying the numerical circuit, I modified the grid, as it seemed easier to modify and conduct a power flow study. The split_branch() function is as follows:
@staticmethod
def split_branch(branch: BRANCH_TYPES, fault_position: float, r_fault: float, x_fault: float):
"""
Split a branch by a given distance
:param branch: Branch of a circuit
:param fault_position: per unit distance measured from the "from" bus (0 ~ 1)
:param r_fault: Fault resistance in p.u.
:param x_fault: Fault reactance in p.u.
:return: the two new Branches and the mid short-circuited bus
"""
# Extract the parameters of the faulted line
r = branch.R
x = branch.X
b = branch.B
r2 = branch.R2
x2 = branch.X2
b2 = branch.B2
r0 = branch.R0
x0 = branch.X0
b0 = branch.B0
rate = branch.rate
name = branch.name
# Initialize a dummy bus
dummy_bus = Bus(
Vnom=branch.Vf,
r_fault=r_fault,
x_fault=x_fault
)
# Initialize the auxiliary branches
first_branch = Line(
name=name + ' part 1',
bus_from=branch.bus_from,
bus_to=dummy_bus,
r=r * fault_position,
x=x * fault_position,
b=b * fault_position,
r2=r2 * fault_position,
x2=x2 * fault_position,
b2=b2 * fault_position,
r0=r0 * fault_position,
x0=x0 * fault_position,
b0=b0 * fault_position,
rate=rate
)
second_branch = Line(
name=name + ' part 2',
bus_from=dummy_bus,
bus_to=branch.bus_to,
r=r * (1 - fault_position),
x=x * (1 - fault_position),
b=b * (1 - fault_position),
r2=r2 * (1 - fault_position),
x2=x2 * (1 - fault_position),
b2=b2 * (1 - fault_position),
r0=r0 * (1 - fault_position),
x0=x0 * (1 - fault_position),
b0=b0 * (1 - fault_position),
rate=rate
)
# Deactivate the faulted branch
branch.active = False
# Return the auxiliary branches and the dummy bus
return first_branch, second_branch, dummy_bus
The run() function is modified as follows:
def run(self):
"""
Run a power flow for every circuit.
@return:
"""
# Time the calculation and set the run flag
self.tic()
self._is_running = True
# Copy the original grid and its power flow results
grid = self.grid.copy()
pf_results = self.pf_results
# Check if there are mid-line faults
if self.options.mid_line_fault:
# Split the faulted branch into two auxiliary branches incident to the faulted dummy bus
branch = self.grid.get_branches()[self.options.branch_index]
first_branch, second_branch, dummy_bus = self.split_branch(
branch=branch,
fault_position=self.options.branch_fault_locations,
r_fault=self.options.branch_fault_r,
x_fault=self.options.branch_fault_x
)
# Add the dummy bus and the two auxiliary branches to the grid
grid.add_bus(dummy_bus)
grid.add_line(first_branch)
grid.add_line(second_branch)
# Rerun the power flow for the modified grid
pf = PowerFlowDriver(grid=grid, options=self.pf_options, opf_results=self.opf_results)
pf.run()
# Check if power flow converged and update the results
if not pf.results.converged:
self.logger.add_error('Power flow did not converge on the modified grid for mid-line fault.')
self._is_running = False
self.toc()
return
else:
pf_results = pf.results
# Add the dummy bus index to the list of short-circuited buses
fault_bus_index = len(grid.buses) - 1
self.options.bus_index = fault_bus_index
# Compile the numerical circuit
if self.options.method == MethodShortCircuit.phases:
nc = compile_numerical_circuit_at(
circuit=grid,
t_idx=None,
apply_temperature=self.pf_options.apply_temperature_correction,
branch_tolerance_mode=self.pf_options.branch_impedance_tolerance_mode,
opf_results=self.opf_results,
logger=self.logger,
fill_three_phase=True
)
else:
nc = compile_numerical_circuit_at(
circuit=grid,
t_idx=None,
apply_temperature=self.pf_options.apply_temperature_correction,
branch_tolerance_mode=self.pf_options.branch_impedance_tolerance_mode,
opf_results=self.opf_results,
logger=self.logger,
fill_three_phase=False
)
# Split the grid into islands
calculation_inputs = nc.split_into_islands(
ignore_single_node_islands=self.pf_options.ignore_single_node_islands
)
# Initialize the results for the original/modified grid
results = ShortCircuitResults(
n=nc.nbus,
m=nc.nbr,
n_hvdc=nc.nhvdc,
bus_names=nc.bus_data.names,
branch_names =nc.passive_branch_data.names,
hvdc_names=nc.hvdc_data.names,
bus_types=nc.bus_data.bus_types
)
# Compile the fault impedance array
Zf = self.compile_zf(grid)
# Check if there is a single or multiple islands in the grid
if len(calculation_inputs) > 1: # Multiple islands
# Loop through each island
for i, island in enumerate(calculation_inputs):
# The options give the bus index counting all the grid, however
# for the calculation we need the bus index in the island scheme.
# Hence, we need to convert it, and if the global bus index is not
# on the island, do not perform any calculation.
reverse_bus_index = {b: i for i, b in enumerate(island.bus_data.original_idx)}
island_bus_index = reverse_bus_index.get(self.options.bus_index, None)
if island_bus_index is not None:
res = self.single_short_circuit(
nc=island,
Vpf=pf_results.voltage[island.bus_data.original_idx],
Zf=Zf[island.bus_data.original_idx],
island_bus_index=island_bus_index,
fault_type=self.options.fault_type,
method=self.options.method,
phases=self.options.phases,
Spf=pf_results.Sbus[island.bus_data.original_idx]
)
# Merge results
results.apply_from_island(res, island.bus_data.original_idx, island.passive_branch_data.original_idx)
else: # Single island
res = self.single_short_circuit(nc=calculation_inputs[0],
Vpf=pf_results.voltage,
Zf=Zf,
island_bus_index=self.options.bus_index,
fault_type=self.options.fault_type,
method=self.options.method,
phases=self.options.phases,
Spf=pf_results.Sbus
)
# Merge results
results.apply_from_island(res, calculation_inputs[0].bus_data.original_idx, calculation_inputs[0].passive_branch_data.original_idx)
# expand voltages if there was a bus topology reduction
if nc.topology_performed:
results.voltage = nc.propagate_bus_result(results.voltage)
results.voltage1 = nc.propagate_bus_result(results.voltage1)
results.voltage0 = nc.propagate_bus_result(results.voltage0)
results.voltage2 = nc.propagate_bus_result(results.voltage2)
# Map results back to original grid if mid-line fault
if self.options.mid_line_fault:
self.results.sc_branch_index = self.options.branch_index
self.results = self._map_midline_results_to_original(results, grid, self.grid, self.options.branch_index)
else:
results.sc_type = self.options.fault_type
results.sc_bus_index = self.options.bus_index
self.results = results
# Stop the timer
self._is_running = False
self.toc()
I've also added a static function to map the results from the modified to the original network:
@staticmethod
def _map_midline_results_to_original(results: ShortCircuitResults,
modified_grid: MultiCircuit,
original_grid: MultiCircuit,
original_branch_index: int) -> ShortCircuitResults:
"""
Map results from modified grid back to original grid structure.
"""
# Create results object for original grid
original_results = ShortCircuitResults(
n=original_grid.get_bus_number(),
m=original_grid.get_branch_number(),
n_hvdc=original_grid.get_hvdc_number(),
bus_names=original_grid.get_bus_names(),
branch_names=original_grid.get_branch_names(),
hvdc_names=original_grid.get_hvdc_names(),
bus_types=np.ones(original_grid.get_bus_number()),
area_names=original_grid.get_area_names()
)
# Determine the number of buses in the original grid and map bus results while excluding the dummy bus
n_original_buses = original_grid.get_bus_number()
for attr in ['voltage0', 'voltage1', 'voltage2']:
if hasattr(results, attr) and hasattr(original_results, attr):
original_value = getattr(results, attr)[:n_original_buses]
setattr(original_results, attr, original_value)
# Get branch names from original and modified grids
original_branch_names = original_results.branch_names
modified_branch_names = results.branch_names
# Create a mapping from original branch names to modified branch indices
name_to_modified_idx = {name: idx for idx, name in enumerate(modified_branch_names)}
# Get the name of the original faulted branch
original_faulted_branch_name = original_branch_names[original_branch_index]
# Find the auxiliary branches by name
first_branch_name = original_faulted_branch_name + ' part 1'
second_branch_name = original_faulted_branch_name + ' part 2'
first_branch_idx = name_to_modified_idx.get(first_branch_name)
second_branch_idx = name_to_modified_idx.get(second_branch_name)
# Map branch results
for attr in ['Sf0', 'Sf1', 'Sf2', 'St0', 'St1', 'St2',
'If0', 'If1', 'If2', 'It0', 'It1', 'It2',
'loading0', 'loading1', 'loading2',
'losses0', 'losses1', 'losses2',
'Vbranch0', 'Vbranch1', 'Vbranch2']:
if hasattr(results, attr) and hasattr(original_results, attr):
modified_values = getattr(results, attr)
original_values = np.zeros(len(original_branch_names), dtype=modified_values.dtype)
# Map each original branch to its corresponding result
for orig_idx in range(len(original_branch_names)):
if orig_idx == original_branch_index:
# This is the faulted branch - combine results from the two auxiliary branches
if first_branch_idx is not None and second_branch_idx is not None:
if 'Sf' in attr or 'If' in attr:
# For the "from" side variables, extract the first branch values
original_values[orig_idx] = modified_values[first_branch_idx]
elif 'St' in attr or 'It' in attr:
# For the "to" side variables, extract the second branch values
original_values[orig_idx] = modified_values[second_branch_idx]
elif 'loading' in attr:
# For loading, take the maximum of the two segments
original_values[orig_idx] = max(abs(modified_values[first_branch_idx]),
abs(modified_values[second_branch_idx]))
elif 'losses' in attr:
# For losses, add values of the two segments
original_values[orig_idx] = (modified_values[first_branch_idx] +
modified_values[second_branch_idx])
else:
# Regular branch - find its index in the modified grid by name
original_branch_name = original_branch_names[orig_idx]
modified_idx = name_to_modified_idx.get(original_branch_name)
if modified_idx is not None:
original_values[orig_idx] = modified_values[modified_idx]
setattr(original_results, attr, original_values)
# Copy other attributes
original_results.sc_type = results.sc_type
# Return the results for the original grid
return original_results
Hi @slazar394 !
Looks nice, however I'll try to explain why this cannot be the normal operation. We have two options.
In general, during simulations, objects should be read-only. This means that when a function receives a circuit, this circuit should not be modified under no circumstances. The reason for that is to allow parallelism and predictable behavior. What would happen if we call this function twice? there would be several buses and branches created, producing an unexpected behavior. This would be even worse if we parallelize.
That is why I said that the way to go is to use the numerical circuit, which is fungible in this context. After the simulation we should take care of the results corresponding to the original input, but shouldn't be too hard.
The other option is simply to add a split branch function in the grid (isn't there one yet?) so that the user is the one in charge of understanding the grid modification, and the simulation is not doing anything strange for them.