"""
This file gathers different functions used in the DispaSET pre-processing tools
@author: Sylvain Quoilin
"""
from __future__ import division
import logging
import os
import shutil
import sys
import numpy as np
import pandas as pd
from ..common import commons
from ..misc.gdx_handler import write_variables
from ..misc.str_handler import clean_strings, shrink_to_64
[docs]def pd_timestep(hours):
"""
Function that converts time steps in hours into pandas frequencies (e.g '1h', '15min', ...)
"""
if not isinstance(hours, (int, float)):
logging.critical('Time steps must be provided in hours (integer or float number')
sys.exit(1)
if hours == 1:
return '1h'
elif hours == 0.25:
return '15min'
elif hours == 24:
return '24h'
else:
return ''
[docs]def EfficiencyTimeSeries(config, plants, Temperatures):
"""
Function that calculates an efficiency time series for each unit
In case of generation unit, the efficiency is constant in time (for now)
In case of of p2h units, the efficicncy is defined as the COP, which can be
temperature-dependent or not
If it is temperature-dependent, the formula is:
COP = COP_nom + coef_a * (T-T_nom) + coef_b * (T-T_nom)^2
:param config: Dispa-SET config file
:param plants: Pandas dataframe with the original list of units
:param Temperatures: Dataframe with the temperature for all relevant units
:returns: Dataframe with a time series of the efficiency for each unit
"""
Efficiencies = pd.DataFrame(columns=plants.index, index=config['idx_long'])
for u in plants.index:
z = plants.loc[u, 'Zone']
if plants.loc[u, 'Technology'] in commons['tech_p2ht'] and plants.loc[u,'Tnominal'] in plants:
eff = plants.loc[u, 'COP'] + plants.loc[u, 'coef_COP_a'] * (Temperatures[z] - plants.loc[u, 'Tnominal']) + \
plants.loc[u, 'coef_COP_b'] * (Temperatures[z] - plants.loc[u, 'Tnominal']) ** 2
elif (plants.loc[u, 'Technology'] in commons['tech_p2ht']) or \
(plants.loc[u, 'Technology'] in commons['tech_heat']):
eff = plants.loc[u, 'COP']
else:
eff = plants.loc[u, 'Efficiency']
Efficiencies[u] = eff
return Efficiencies
[docs]def select_units(units, config):
"""
Function returning a new list of units by removing the ones that have unknown
technology, zero capacity, or unknown zone
:param units: Pandas dataframe with the original list of units
:param config: Dispa-SET config dictionnary
:return: New list of units
"""
for unit in units.index:
if units.loc[unit, 'Technology'] == 'Other':
logging.warning('Removed Unit ' + str(units.loc[unit, 'Unit']) + ' since its technology is unknown')
units.drop(unit, inplace=True)
elif (units.loc[unit, 'PowerCapacity'] == 0) and ((units.loc[unit, 'STOMaxChargingPower'] == 0) or (
type(units.loc[unit, 'STOMaxChargingPower']) == np.float64)):
logging.warning('Removed Unit ' + str(units.loc[unit, 'Unit']) + ' since it has a null capacity')
units.drop(unit, inplace=True)
elif units.loc[unit, 'Zone'] not in config['zones']:
logging.warning('Removed Unit ' + str(units.loc[unit, 'Unit']) + ' since its zone (' + str(
units.loc[unit, 'Zone']) + ') is not in the list of zones')
units.drop(unit, inplace=True)
units.index = range(len(units))
return units
[docs]def incidence_matrix(sets, set_used, parameters, param_used):
"""
This function generates the incidence matrix of the lines within the nodes
A particular case is considered for the node "Rest Of the World", which is no explicitely defined in DispaSET
"""
for i, l in enumerate(sets[set_used]):
[from_node, to_node] = l.split('->')
if (from_node.strip() in sets['n']) and (to_node.strip() in sets['n']):
parameters[param_used]['val'][i, sets['n'].index(to_node.strip())] = 1
parameters[param_used]['val'][i, sets['n'].index(from_node.strip())] = -1
elif (from_node.strip() in sets['n']) and (to_node.strip() == 'RoW'):
parameters[param_used]['val'][i, sets['n'].index(from_node.strip())] = -1
elif (from_node.strip() == 'RoW') and (to_node.strip() in sets['n']):
parameters[param_used]['val'][i, sets['n'].index(to_node.strip())] = 1
else:
logging.error("The line " + str(
l) + " contains unrecognized nodes (" + from_node.strip() + ' or ' + to_node.strip() + ")")
return parameters[param_used]
[docs]def interconnections(Simulation_list, NTC_inter, Historical_flows):
"""
Function that checks for the possible interconnections of the zones included
in the simulation. If the interconnections occurs between two of the zones
defined by the user to perform the simulation with, it extracts the NTC between
those two zones. If the interconnection occurs between one of the zones
selected by the user and one country outside the simulation, it extracts the
physical flows; it does so for each pair (country inside-country outside) and
sums them together creating the interconnection of this country with the RoW.
:param Simulation_list: List of simulated zones
:param NTC_inter: Day-ahead net transfer capacities (pd dataframe)
:param Historical_flows: Historical flows (pd dataframe)
"""
index = NTC_inter.index.tz_localize(None).intersection(Historical_flows.index.tz_localize(None))
if len(index) == 0:
logging.error('The two input dataframes (NTCs and Historical flows) must have the same index. '
'No common values have been found')
sys.exit(1)
elif len(index) < len(NTC_inter) or len(index) < len(Historical_flows):
diff = np.maximum(len(Historical_flows), len(NTC_inter)) - len(index)
logging.warning('The two input dataframes (NTCs and Historical flows) do not share the same index, '
'although some values are common. The intersection has been considered and ' + str(diff) +
' data points have been lost')
# Checking that all values are positive:
if (NTC_inter.values < 0).any():
pos = np.where(NTC_inter.values < 0)
logging.warning('At least one NTC value is negative, for example in line ' + str(NTC_inter.columns[pos[1][0]]) +
' and time step ' + str(NTC_inter.index[pos[0][0]]))
if (Historical_flows.values < 0).any():
pos = np.where(Historical_flows.values < 0)
logging.warning('At least one historical flow is negative, for example in line ' +
str(Historical_flows.columns[pos[1][0]]) + ' and time step ' +
str(Historical_flows.index[pos[0][0]]))
all_connections = []
simulation_connections = []
# List all connections from the dataframe headers:
ConList = Historical_flows.columns.tolist() + [x for x in NTC_inter.columns.tolist() if
x not in Historical_flows.columns.tolist()]
for connection in ConList:
z = connection.split(' -> ')
if z[0] in Simulation_list:
all_connections.append(connection)
if z[1] in Simulation_list:
simulation_connections.append(connection)
elif z[1] in Simulation_list:
all_connections.append(connection)
df_zones_simulated = pd.DataFrame(index=index)
for interconnection in simulation_connections:
if interconnection in NTC_inter.columns:
df_zones_simulated[interconnection] = NTC_inter[interconnection]
logging.info('Detected interconnection ' + interconnection +
'. The historical NTCs will be imposed as maximum flow value')
interconnections1 = df_zones_simulated.columns
# Display a warning if a zone is isolated:
for z in Simulation_list:
if not any([z in conn for conn in interconnections1]) and len(Simulation_list) > 1:
logging.warning('Zone ' + z + ' does not appear to be connected to any other zone in the NTC table. '
'It should be simulated in isolation')
df_RoW_temp = pd.DataFrame(index=index)
connNames = []
for interconnection in all_connections:
if interconnection in Historical_flows.columns and interconnection not in simulation_connections:
df_RoW_temp[interconnection] = Historical_flows[interconnection]
connNames.append(interconnection)
compare_set = set()
for k in connNames:
if not k[0:2] in compare_set and k[0:2] in Simulation_list:
compare_set.add(k[0:2])
df_zones_RoW = pd.DataFrame(index=index)
while compare_set:
nameToCompare = compare_set.pop()
exports = []
imports = []
for name in connNames:
if nameToCompare[0:2] in name[0:2]:
exports.append(connNames.index(name))
logging.info('Detected interconnection ' + name + ', happening between a simulated zone '
'and the rest of the world. The historical flows will be imposed to the model')
elif nameToCompare[0:2] in name[6:8]:
imports.append(connNames.index(name))
logging.info('Detected interconnection ' + name + ', happening between the rest of the world '
'and a simulated zone. The historical flows will be imposed to the model')
if len(exports) > 0:
flows_out = pd.concat(df_RoW_temp[connNames[exports[i]]] for i in range(len(exports)))
flows_out = flows_out.groupby(flows_out.index).sum()
flows_out.name = nameToCompare + ' -> RoW'
df_zones_RoW[nameToCompare + ' -> RoW'] = flows_out
if len(imports) > 0:
flows_in = pd.concat(df_RoW_temp[connNames[imports[j]]] for j in range(len(imports)))
flows_in = flows_in.groupby(flows_in.index).sum()
flows_in.name = 'RoW -> ' + nameToCompare
df_zones_RoW['RoW -> ' + nameToCompare] = flows_in
interconnections2 = df_zones_RoW.columns
inter = list(interconnections1) + list(interconnections2)
return df_zones_simulated, df_zones_RoW, inter
# Helpers
def _mylogspace(low, high, N):
"""
Self-defined logspace function in which low and high are the first and last values of the space
"""
# shifting all values so that low = 1
space = np.logspace(0, np.log10(high + low + 1), N) - (low + 1)
return space
def _find_nearest(array, value):
"""
Self-defined function to find the index of the nearest value in a vector
"""
idx = (np.abs(array - value)).argmin()
return idx
def _reverse_dict(dict_):
"""
Reverse Dictionary (Key, Value) to (Value, Key)
:param dict_: Dictionary to reverse
"""
new_dic = {}
for k, v in dict_.items():
for x in v:
new_dic[x] = k
return new_dic
def _split_list(list_):
"""
Split list elements into string with " - " seperator
:param list_: List to split
"""
res = str()
# remove empty elements from the list:
newlist = [l for l in list_ if (str(l) != 'nan') and (str(l) != '')]
for l in newlist:
if l != newlist[-1]:
res += str(l) + " - "
else:
res += str(l)
return res
def _list2dict(list_, agg): return {key: agg for key in list_}
def _flatten_list(l):
"""
Function that unfolds nested lists
Example:
[1, 3, ['aa','bb'],4] is turned into [1,3, 'aa', 'bb', 4]
"""
flat_list = []
for sublist in l:
if isinstance(sublist, list):
for item in sublist:
flat_list.append(item)
else:
flat_list.append(sublist)
return flat_list
def _merge_two_dicts(x, y):
"""Given two dicts, merge them into a new dict as a shallow copy.
Used for compatibility Python 2 and 3
inspired by: https://stackoverflow.com/questions/38987/how-to-merge-two-dictionaries-in-a-single-expression
"""
z = x.copy()
z.update(y)
return z
def _get_index(df_, idx):
former_indexes = [_flatten_list(list(df_.loc[i]['FormerIndexes'].values)) for i in idx]
former_units = [_flatten_list(list(df_.loc[i]['FormerUnits'].values)) for i in idx]
return former_indexes, former_units
def _create_mapping(merged_df):
mapping = {"NewIndex": {}, 'FormerIndexes': merged_df['FormerIndexes'].to_dict()}
mapping['NewIndex'] = _reverse_dict(mapping['FormerIndexes'])
return mapping
def _new_unit_names(df_merged, df_, string_keys):
# if merged unit, create name -> else take old name for unit
keys = ['FormerIndexes'] + string_keys
create_unit_name = lambda x: str(x.FormerIndexes) + " - " + df_.iloc[x.FormerIndexes[0]]['Unit'] if len(
x.FormerIndexes) == 1 else shrink_to_64(clean_strings(_split_list(list(x[keys].values))))
df_merged['Unit'] = df_merged.apply(create_unit_name, axis=1)
return df_merged.set_index('Unit', drop=False)
def _linearize_ramping(plants):
'''
Function that converts the integer constraints for the power plants into ramping rates
'''
#ramping_up = (lambda row: min(1/ (row["MinDownTime"]+1E-9) / 60, row["RampUpRate"]))
#ramping_down = (lambda row: min(1/ (row["MinUpTime"]+1E-9) / 60, row["RampDownRate"]))
#plants["RampUpRate"] = plants.apply(ramping_up, axis=1)
#plants["RampDownRate"] = plants.apply(ramping_down, axis=1)
plants["RampUpRate"] = plants['PartLoadMin'] * ( 1 / (np.maximum(1,plants['MinDownTime'])*60 + 1e-9)) + (1 - plants['PartLoadMin'])*np.minimum(plants['RampUpRate'],1/60)
plants["RampDownRate"] = plants['PartLoadMin'] * ( 1 / (np.maximum(1,plants['MinUpTime'])*60 + 1e-9)) + (1 - plants['PartLoadMin'])*np.minimum(plants['RampDownRate'],1/60)
[docs]def group_plants(plants, method, df_grouped=False, group_list=None):
"""
This function returns the final dataframe with the merged units and their characteristics
:param plants: Pandas dataframe with each power plant and their characteristics
(following the DispaSET format)
:param method: Select clustering method ('Standard'/'LP'/None)
:param df_grouped: Set to True if this plants dataframe has already been grouped and contains the column
"FormerIndexes"
:param group_list: List of columns whose values must be identical in order to group two units
:return: A list with the merged plants and the mapping between the original and merged units
"""
# Definition of the merged power plants dataframe:
if (group_list is None) and ((plants['Zone_th'] != np.nan).all()) and ((plants['Zone_h2'] != np.nan).all()):
group_list = ['Zone', 'Zone_th', 'Zone_h2', 'Technology', 'Fuel', 'CHPType']
elif (group_list is None) and ((plants['Zone_th'] != np.nan).all()):
group_list = ['Zone', 'Zone_th', 'Technology', 'Fuel', 'CHPType']
else:
group_list = ['Zone', 'Technology', 'Fuel', 'CHPType']
plants_merged = pd.DataFrame(columns=plants.columns)
grouped = plants.groupby(group_list, as_index=False)
agg_dict = create_agg_dict(plants, method=method)
# plants_merged = plants_merged.append(grouped.agg(agg_dict))
plants_merged = pd.concat([plants_merged, grouped.agg(agg_dict)])
if method == "Integer clustering":
plants_merged['StartUpCost'] = plants_merged['StartUpCost'] / plants_merged['Nunits']
plants_merged['NoLoadCost'] = plants_merged['NoLoadCost'] / plants_merged['Nunits']
idx = [list(i.values) for i in list(grouped.groups.values())]
if not df_grouped:
plants_merged['FormerIndexes'] = [list(plants.loc[i]['index'].values) for i in idx]
plants_merged['FormerUnits'] = [list(plants.loc[i]['Unit'].values) for i in idx]
else:
# case in which the plants have already been clustered once => nested lists in FormerIndexes
former_indexes, former_units = _get_index(plants, idx)
plants_merged['FormerIndexes'] = list(former_indexes)
plants_merged['FormerUnits'] = list(former_units)
return plants_merged
[docs]def create_agg_dict(df_, method="Standard"):
"""
This function returns a dictionnary with the proper aggregation method
for each columns of the units table, depending on the clustering method
Author: Matthias Zech
"""
# lambda functions for other aggregations than standard aggregators like min/max,...
wm_pcap = lambda x: np.average(x.astype(float),
weights=df_.loc[x.index, "PowerCapacity"]) # weighted mean with weight=PowerCapacity
wm_nunit = lambda x: np.average(x.astype(float),
weights=df_.loc[x.index, "Nunits"]) # weighted mean with weight=NUnits
get_ramping_cost = lambda x: wm_pcap(
(1 - df_.loc[x.index, "PartLoadMin"]) * x + df_.loc[x.index, "StartUpCost"] / df_.loc[x.index, "PowerCapacity"])
min_load = lambda x: np.min(x * df_.loc[x.index, "PowerCapacity"]) / df_.loc[x.index, "PowerCapacity"].sum()
if method in ("Standard", "MILP"):
sum_cols = ["PowerCapacity", "STOCapacity", "STOMaxChargingPower", "InitialPower", "CHPMaxHeat"]
weighted_avg_cols = [
"RampUpRate",
"RampDownRate",
"MinUpTime",
"MinDownTime",
"StartUpCost",
"NoLoadCost",
"Efficiency",
"MinEfficiency",
"STOChargingEfficiency",
"CO2Intensity",
"STOSelfDischarge",
"CHPPowerToHeat",
"CHPPowerLossFactor",
'COP',
'TNominal',
'coef_COP_a',
'coef_COP_b',
'WaterConsumption',
'WaterWithdrawal',
'RampingCost'
]
min_cols = ["StartUpTime"]
nunits = ["Nunits"]
# Define aggregators
agg_dict = _list2dict(sum_cols, 'sum')
agg_dict = _merge_two_dicts(agg_dict, _list2dict(weighted_avg_cols, wm_pcap))
agg_dict = _merge_two_dicts(agg_dict, _list2dict(min_cols, 'min'))
agg_dict = _merge_two_dicts(agg_dict, _list2dict(['PartLoadMin'], min_load))
# agg_dict = _merge_two_dicts(agg_dict, _list2dict(ramping_cost, get_ramping_cost))
agg_dict = _merge_two_dicts(agg_dict, _list2dict(nunits, lambda x: 1))
agg_dict = dict((k, v) for k, v in agg_dict.items() if k in df_.columns) # remove unnecesary columns
return agg_dict
elif method == "LP clustered":
sum_cols = ["PowerCapacity", "STOCapacity", "STOMaxChargingPower", "InitialPower", "CHPMaxHeat"]
weighted_avg_cols = [
"RampUpRate",
"RampDownRate",
"MinUpTime",
"MinDownTime",
"StartUpCost",
"NoLoadCost",
"Efficiency",
"MinEfficiency",
'PartLoadMin',
"STOChargingEfficiency",
"CO2Intensity",
"STOSelfDischarge",
"CHPPowerToHeat",
"CHPPowerLossFactor",
'COP',
'TNominal',
'coef_COP_a',
'coef_COP_b',
'WaterConsumption',
'WaterWithdrawal',
'RampingCost'
]
min_cols = ["StartUpTime"]
# ramping_cost = ["RampingCost"]
nunits = ["Nunits"]
# Define aggregators
agg_dict = _list2dict(sum_cols, 'sum')
agg_dict = _merge_two_dicts(agg_dict, _list2dict(weighted_avg_cols, wm_pcap))
agg_dict = _merge_two_dicts(agg_dict, _list2dict(min_cols, 'min'))
#agg_dict = _merge_two_dicts(agg_dict, _list2dict(['PartLoadMin'], lambda x: 0))
# agg_dict = _merge_two_dicts(agg_dict, _list2dict(ramping_cost, get_ramping_cost))
agg_dict = _merge_two_dicts(agg_dict, _list2dict(nunits, lambda x: 1))
agg_dict = dict((k, v) for k, v in agg_dict.items() if k in df_.columns) # remove unnecesary columns
return agg_dict
elif method == "Integer clustering":
sum_cols = ["Nunits", "StartUpCost",'NoLoadCost',]
weighted_avg_cols = ['PowerCapacity',
'RampUpRate',
'RampDownRate',
'MinUpTime',
'MinDownTime',
'Efficiency',
'MinEfficiency',
'STOChargingEfficiency',
'CO2Intensity',
'STOSelfDischarge',
'STOCapacity',
'STOMaxChargingPower',
'PartLoadMin',
'StartUpTime',
'RampingCost',
'CHPPowerToHeat',
'CHPPowerLossFactor',
'CHPMaxHeat',
'COP',
'TNominal',
'coef_COP_a',
'coef_COP_b',
'WaterConsumption',
'WaterWithdrawal'
]
# Define aggregators
agg_dict = _list2dict(sum_cols, 'sum')
agg_dict = _merge_two_dicts(agg_dict, _list2dict(weighted_avg_cols, wm_nunit))
agg_dict = dict((k, v) for k, v in agg_dict.items() if k in df_.columns) # remove unnecesary columns
return agg_dict
else:
logging.critical('Clustering method not properly specified. Should be one of the following options:'
' LP clustered, MILP, Standard, Integer clustering')
sys.exit(1)
[docs]def clustering(plants_in, method="Standard", Nslices=20, PartLoadMax=0.1, Pmax=30):
"""
Merge excessively disaggregated power Units.
:param plants_in: Pandas dataframe with each power plant and their characteristics
(following the DispaSET format)
:param method: Select clustering method ('Standard'/'LP'/None)
:param Nslices: Number of slices used to fingerprint each power plant characteristics.
Slices in the power plant data to categorize them
(fewer slices involves that the plants will be aggregated more easily)
:param PartLoadMax: Maximum part-load capability for the unit to be clustered
:param Pmax: Maximum power for the unit to be clustered
:return: A list with the merged plants and the mapping between the original and merged units
@author: Matthias Zech
"""
# do not alter the original plants table:
plants = plants_in.copy()
# Checking the the required columns are present in the input pandas dataframe:
required_inputs = ['Unit', 'PowerCapacity', 'PartLoadMin', 'RampUpRate', 'RampDownRate', 'StartUpTime',
'MinUpTime', 'MinDownTime', 'NoLoadCost', 'StartUpCost', 'Efficiency']
for input_value in required_inputs:
if input_value not in plants.columns:
logging.error("The plants dataframe requires a '" + input_value + "' column for clustering")
sys.exit(1)
if "Nunits" not in plants:
plants["Nunits"] = 1
plants.loc[plants['PowerCapacity'] == 0, 'PowerCapacity'] = 1e-9
Nunits = len(plants)
plants.index = range(Nunits)
plants_merged = pd.DataFrame(columns=plants.columns)
# Fill nan values:
string_keys = ["Zone", "Technology", "Fuel", "CHPType"]
for key in string_keys:
plants[key].fillna("", inplace=True)
for key in ['PartLoadMin', 'StartUpTime', 'MinUpTime', 'MinDownTime', 'NoLoadCost', 'StartUpCost',
'WaterWithdrawal', 'WaterConsumption']:
plants[key].fillna(0, inplace=True)
for key in ['RampUpRate', 'RampDownRate']:
plants[key].fillna(1e9, inplace=True)
# Checking the validity of the selected clustering method
plants["index"] = plants.index
OnlyOnes = (plants["Nunits"] == 1).all()
if method in ["Standard", "MILP"]:
if OnlyOnes:
####### Three cluster groups in the standard MILP formulation
###### 1) Highly flexible
###### 2) Low Pmin
###### 3) Similar characteristics --> similarity expressed via fingerprints
# First, cluster by same string keys and flexible and low_pmax
# Join grouped data with inflexible and no low_pmmax data
# Group joined dataframe by string keys including same technical characteristics using fingerprints
# The more Nslices, the more heterogenity between data, the less is merged
# Definition of the fingerprint value of each power plant, i.e. the pattern of the slices number in
# which each of its characteristics falls:
# helper_cols = ['flex', 'low_pmin', 'low_pmax', 'fingerprints']
highly_flexible = (
(plants["RampUpRate"] > 1 / 60)
& (plants["RampDownRate"] > 1 / 60)
& (plants["StartUpTime"] < 1)
& (plants["MinDownTime"] <= 1)
& (plants["MinUpTime"] <= 1)
)
low_pmax = plants["PowerCapacity"] <= Pmax
plants["flex"] = highly_flexible
plants["low_pmax"] = low_pmax
plants["FormerIndexes"] = pd.Series(plants.index.values).apply(lambda x: [x])
plants["FormerUnits"] = pd.Series(plants['Unit'].values).apply(lambda x: [x])
condition = (plants["low_pmax"]) | (plants["flex"])
first_cluster = plants[condition] # all data without other clustering
first_cluster = group_plants(first_cluster, method, False, string_keys)
# first_cluster = first_cluster.append(plants[~condition], ignore_index=True)
first_cluster = pd.concat([first_cluster, plants[~condition]], ignore_index=True)
# Slicing:
bounds = {
"PartLoadMin": np.linspace(0, 1, Nslices),
"RampUpRate": np.linspace(0, 1, Nslices),
"RampDownRate": np.linspace(0, 1, Nslices),
"StartUpTime": _mylogspace(0, 36, Nslices),
"MinUpTime": _mylogspace(0, 168, Nslices),
"MinDownTime": _mylogspace(0, 168, Nslices),
"NoLoadCost": np.linspace(0, 50, Nslices),
"StartUpCost": np.linspace(0, 500, Nslices),
"Efficiency": np.linspace(0, 1, Nslices),
"WaterWithdrawal": np.linspace(0, 200, 250),
"WaterConsumption": np.linspace(0, 20, Nslices),
}
fingerprints = []
for i in first_cluster.index:
fingerprints.append(
[
_find_nearest(bounds["PartLoadMin"], first_cluster["PartLoadMin"][i]),
_find_nearest(bounds["RampUpRate"], first_cluster["RampUpRate"][i]),
_find_nearest(bounds["RampDownRate"], first_cluster["RampDownRate"][i]),
_find_nearest(bounds["StartUpTime"], first_cluster["StartUpTime"][i]),
_find_nearest(bounds["MinUpTime"], first_cluster["MinUpTime"][i]),
_find_nearest(bounds["MinDownTime"], first_cluster["MinDownTime"][i]),
_find_nearest(bounds["NoLoadCost"], first_cluster["NoLoadCost"][i]),
_find_nearest(bounds["StartUpCost"], first_cluster["StartUpCost"][i]),
_find_nearest(bounds["Efficiency"], first_cluster["Efficiency"][i]),
_find_nearest(bounds["WaterConsumption"], first_cluster["WaterConsumption"][i]),
_find_nearest(bounds["WaterWithdrawal"], first_cluster["WaterWithdrawal"][i]),
]
)
first_cluster["fingerprints"] = fingerprints
# the elements of the list are irrelevant for the clustering
first_cluster["fingerprints"] = first_cluster["fingerprints"].astype(str)
low_pmin = first_cluster["PartLoadMin"] <= PartLoadMax
if not first_cluster[low_pmin].empty:
second_cluster = group_plants(first_cluster[low_pmin], method, True, string_keys + ["fingerprints"])
# plants_merged = second_cluster.append(first_cluster[~low_pmin], ignore_index=True)
plants_merged = pd.concat([second_cluster, first_cluster[~low_pmin]], ignore_index=True)
else:
plants_merged = first_cluster[:]
plants = plants.drop(["flex", "low_pmax", "FormerIndexes"], axis=1)
plants_merged = plants_merged.drop(["index", "fingerprints", "flex", "low_pmax"], axis=1)
else: # not all only ones
logging.warning("The standard (or MILP) clustering method is only applicable if all values of the "
"Nunits column in the power plant data are set to one. At least one different value has "
"been encountered. No clustering will be applied")
plants_merged = plants.copy()
plants_merged["FormerIndexes"] = plants["index"].apply(lambda x: [x])
plants_merged["FormerUnits"] = plants["Unit"].apply(lambda x: [x])
elif method == "LP clustered":
if not OnlyOnes:
logging.warning("The LP clustering method aggregates all the units of the same type. Individual units are "
"not considered")
list_mult = [
"PowerCapacity",
"STOCapacity",
"STOMaxChargingPower",
"InitialPower",
"CHPMaxHeat",
]
# Restricting the list of values to multiply to those who are present in the plants table:
list_mult = [x for x in list_mult if x in plants]
# Modifying the table to remove multiple-units plants:
plants[list_mult] = plants[list_mult].multiply(plants["Nunits"], axis="index")
plants["Nunits"] = 1
OnlyOnes = True
plants_merged = group_plants(plants, method="LP clustered")
elif method == "LP":
if not OnlyOnes:
logging.warning("The LP method aggregates all identical units by multiplying by the Nunits variable")
list_mult = [
"PowerCapacity",
"STOCapacity",
"STOMaxChargingPower",
"InitialPower",
"CHPMaxHeat",
]
# Restricting the list of values to multiply to those who are present in the plants table:
list_mult = [x for x in list_mult if x in plants]
# Modifying the table to remove multiple-units plants:
plants[list_mult] = plants[list_mult].multiply(plants["Nunits"], axis="index")
plants["Nunits"] = 1
OnlyOnes = True
plants_merged = plants
# formers indexes and units:
plants_merged["FormerIndexes"] = plants["index"].apply(lambda x: [x])
plants_merged["FormerUnits"] = plants["Unit"].apply(lambda x: [x])
elif method == "Integer clustering":
plants_merged = group_plants(plants, method="Integer clustering")
# Correcting the Nunits field of the clustered plants (must be integer):
elif method == "No clustering":
plants_merged = plants.copy()
plants_merged["FormerIndexes"] = plants["index"].apply(lambda x: [x])
plants_merged["FormerUnits"] = plants["Unit"].apply(lambda x: [x])
else:
logging.error('Method argument ("' + str(method) + '") not recognized in the clustering function')
sys.exit(1)
plants_merged = _new_unit_names(plants_merged, plants, string_keys)
# Modify the Unit names with the original index number. In case of merged plants,
# indicate all indexes + the plant type and fuel
mapping = _create_mapping(plants_merged)
if Nunits != len(plants_merged):
logging.info("Clustered " + str(Nunits) + " original units into " + str(len(plants_merged)) + " new units")
else:
logging.warning("Did not cluster any unit")
# indexes of units which were not clustered:
idx_merged = [i for i in plants_merged.index if len(plants_merged.loc[i, 'FormerIndexes']) == 1]
idx_orig = [plants_merged.loc[i, 'FormerIndexes'][0] for i in idx_merged]
columns = plants_merged.columns.drop(['Unit', 'FormerIndexes', 'FormerUnits'])
plants_merged.loc[idx_merged, columns] = plants.loc[idx_orig, columns].values
if method in ['LP','LP clustered']:
# Transforming the min up/down times into ramping rates
_linearize_ramping(plants_merged)
# Transforming the start-up cost into ramping for the plants that did not go through any clustering:
ramping_lbd = (lambda row: row["StartUpCost"] / row["PowerCapacity"] if row.RampingCost == 0
else row.RampingCost)
plants_merged["RampingCost"] = plants_merged.apply(ramping_lbd, axis=1)
# reorder columns:
new_columns = [key for key in plants.columns if key in plants_merged]
plants_merged = plants_merged[new_columns + list(plants_merged.columns.drop(new_columns))]
return plants_merged, mapping
[docs]def adjust_unit_capacity(SimData, u_idx, scaling=1, value=None, singleunit=False):
"""
Function used to modify the installed capacities in the Dispa-SET generated input data
The function update the Inputs.p file in the simulation directory at each call
:param SimData: Input data dictionary
:param u_idx: names of the units to be scaled
:param scaling: Scaling factor to be applied to the installed capacity
:param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !)
:param singleunit: Set to true if the technology should remain lumped in a single unit
:return: New SimData dictionary
"""
# a few checks:
if len(u_idx) ==0:
logging.warning('adjust_unit_capacity : list of units to be scaled is empty')
return SimData
if scaling > 1E10:
logging.warning('adjust_unit_capacity: scaling factor is too high (' + str(scaling) + ')')
return SimData
# find the units to be scaled:
units = SimData['units'].loc[u_idx,:]
cond = SimData['units'].index.isin(u_idx)
idx = pd.Series(np.where(cond)[0], index=units.index)
TotalCapacity = (units.PowerCapacity * units.Nunits).sum()
if scaling != 1:
RequiredCapacity = TotalCapacity * scaling
elif value is not None:
RequiredCapacity = value
else:
RequiredCapacity = TotalCapacity
if singleunit:
Nunits_new = pd.Series(1, index=units.index)
else:
Nunits_new = (units.Nunits * RequiredCapacity / TotalCapacity).astype('float').round()
Nunits_new[Nunits_new < 1] = 1
Cap_new = units.PowerCapacity * RequiredCapacity / (units.PowerCapacity * Nunits_new).sum()
for u in units.index:
logging.info('Unit ' + u + ':')
logging.info(' PowerCapacity: ' + str(SimData['units'].PowerCapacity[u]) + ' --> ' + str(Cap_new[u]))
logging.info(' Nunits: ' + str(SimData['units'].Nunits[u]) + ' --> ' + str(Nunits_new[u]))
factor = Cap_new[u] / SimData['units'].PowerCapacity[u]
SimData['parameters']['PowerCapacity']['val'][idx[u]] = Cap_new[u]
SimData['parameters']['Nunits']['val'][idx[u]] = Nunits_new[u]
SimData['units'].loc[u, 'PowerCapacity'] = Cap_new[u]
SimData['units'].loc[u, 'Nunits'] = Nunits_new[u]
for col in ['CostStartUp', 'NoLoadCost', 'StorageCapacity', 'StorageChargingCapacity']:
SimData['units'].loc[u, col] = SimData['units'].loc[u, col] * factor
for param in ['CostShutDown', 'CostStartUp', 'PowerInitial', 'RampDownMaximum', 'RampShutDownMaximum',
'RampStartUpMaximum', 'RampUpMaximum', 'StorageCapacity']:
SimData['parameters'][param]['val'][idx[u]] = SimData['parameters'][param]['val'][idx[u]] * factor
for param in ['StorageChargingCapacity', 'StorageInitial']:
# find index, if any:
idx_s = np.where(np.array(SimData['sets']['s']) == u)[0]
if len(idx_s) == 1:
idx_s = idx_s[0]
SimData['parameters'][param]['val'][idx_s] = SimData['parameters'][param]['val'][idx_s] * factor
return SimData
[docs]def adjust_capacity(inputs, tech_fuel, scaling=1, value=None, singleunit=False, write_gdx=False, dest_path=''):
"""
Function used to modify the installed capacities in the Dispa-SET generated input data
The function update the Inputs.p file in the simulation directory at each call
:param inputs: Input data dictionary OR path to the simulation directory containing Inputs.p
:param tech_fuel: tuple with the technology and fuel type for which the capacity should be modified
:param scaling: Scaling factor to be applied to the installed capacity
:param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !)
:param singleunit: Set to true if the technology should remain lumped in a single unit
:param write_gdx: boolean defining if Inputs.gdx should be also overwritten with the new data
:param dest_path: Simulation environment path to write the new input data. If unspecified, no data is written!
:return: New SimData dictionary
"""
import pickle
if isinstance(inputs, str):
path = inputs
inputfile = path + '/Inputs.p'
if not os.path.exists(path):
sys.exit('Path + "' + path + '" not found')
with open(inputfile, 'rb') as f:
SimData = pickle.load(f)
elif isinstance(inputs, dict):
SimData = inputs
path = SimData['config']['SimulationDirectory']
else:
logging.error('The input data must be either a dictionary or string containing a valid directory')
sys.exit(1)
if not isinstance(tech_fuel, tuple):
sys.exit('tech_fuel must be a tuple')
# find the units to be scaled:
cond = (SimData['units']['Technology'] == tech_fuel[0]) & (SimData['units']['Fuel'] == tech_fuel[1])
u_idx = SimData['units'][cond].index.tolist()
SimData = adjust_unit_capacity(SimData, u_idx, scaling=scaling, value=value, singleunit=singleunit)
if dest_path == '':
logging.info('Not writing any input data to the disk')
else:
if not os.path.isdir(dest_path):
shutil.copytree(path, dest_path)
logging.info('Created simulation environment directory ' + dest_path)
logging.info('Writing input files to ' + dest_path)
with open(os.path.join(dest_path, 'Inputs.p'), 'wb') as pfile:
pickle.dump(SimData, pfile, protocol=pickle.HIGHEST_PROTOCOL)
if write_gdx:
write_variables(SimData['config'], 'Inputs.gdx', [SimData['sets'], SimData['parameters']])
shutil.copy('Inputs.gdx', dest_path + '/')
os.remove('Inputs.gdx')
return SimData
[docs]def adjust_flexibility(inputs, flex_units, slow_units, flex_ratio, singleunit=False, write_gdx=False, dest_path=''):
"""
Function used to modify the share of the flexible capacity in the Dispa-SET input data
The function update the Inputs.p file in the simulation directory at each call
:param inputs: Input data dictionary OR path to the simulation directory containing Inputs.p
:param flex_units: Dispa-SET units table filtered with only the flexible ones
:param slow_units: Dispa-SET units table filtered with only the slow ones
:param flex_ratio: Target flexibility ratio (single number for all zones)
:param singleunit: Set to true if the technology should remain lumped in a single unit
:param write_gdx: boolean defining if Inputs.gdx should be also overwritten with the new data
:param dest_path: Simulation environment path to write the new input data. If unspecified, no data is written!
:return: New SimData dictionary
"""
import pickle
if isinstance(inputs, str):
path = inputs
inputfile = path + '/Inputs.p'
if not os.path.exists(path):
sys.exit('Path + "' + path + '" not found')
with open(inputfile, 'rb') as f:
SimData = pickle.load(f)
elif isinstance(inputs, dict):
SimData = inputs
path = SimData['config']['SimulationDirectory']
else:
logging.error('The input data must be either a dictionary or string containing a valid directory')
sys.exit(1)
# find the units to be scaled:
units = SimData['units']
# current situation for all zones:"
current_flex_cap = units.PowerCapacity[flex_units].sum()
current_total_cap = current_flex_cap + units.PowerCapacity[slow_units].sum()
current_flex_ratio = current_flex_cap / current_total_cap
#make new dataframe with the current country flex,slow and total installed capacities:
zones = units.loc[flex_units.tolist()+slow_units.tolist(),:].Zone.unique().tolist()
current = pd.DataFrame(index=zones,columns=['flex','slow','total','ratio'])
for z in zones:
current.flex[z] = units.loc[flex_units,:][units.loc[flex_units,:].Zone==z].PowerCapacity.sum()
current.slow[z] = units.loc[slow_units,:][units.loc[slow_units,:].Zone==z].PowerCapacity.sum()
current.total[z] = current.flex[z] + current.slow[z]
current.ratio[z] = current.flex[z] / current.total[z]
# target flexible capacity for all zones:"
target_flex_cap = flex_ratio * current_total_cap
# flexibile capacity to be added (positive) or removed (negative)
delta_flex_cap = target_flex_cap - current_flex_cap
if delta_flex_cap >0:
# sort the current dataframe, highest flexibility first:
current.sort_values('ratio',ascending=False,inplace=True)
current['cum_sum'] = current.total.cumsum() # save the cumulative zone capacities in a column
# variable containing the remaining flexible capacity to be assigned to the zones:
remaining = delta_flex_cap
# Recursively add flexible capacity in each zone:
for z in current.index:
#weight of the current zone compared to the total of remaining zones
weight = current.total[z]/(current_total_cap - current.cum_sum[z] + current.total[z])
# added flexible capacity in this zone is bounded in order not to exceed the total capacity:
added_flex_cap = min(weight*remaining,current.total[z] - current.flex[z])
current.loc[z,'new_flex_cap'] = current.flex[z] + added_flex_cap
current.loc[z,'new_slow_cap'] = current.slow[z] - added_flex_cap
remaining -= added_flex_cap
elif delta_flex_cap < 0:
# sort the current dataframe, highest flexibility first:
current.sort_values('ratio',ascending=True,inplace=True)
current['cum_sum'] = current.total.cumsum() # save the cumulative zone capacities in a column
# variable containing the remaining flexible capacity to be assigned to the zones:
remaining = -delta_flex_cap
# Recursively add flexible capacity in each zone:
for z in current.index:
#weight of the current zone compared to the total of remaining zones
weight = current.total[z]/(current_total_cap - current.cum_sum[z] + current.total[z])
# added flexible capacity in this zone is bounded in order not to exceed the total capacity:
removed_flex_cap = min(weight*remaining,current.flex[z])
current.loc[z,'new_flex_cap'] = current.flex[z] - removed_flex_cap
current.loc[z,'new_slow_cap'] = current.slow[z] + removed_flex_cap
remaining -= removed_flex_cap
else:
current.loc[z,'new_flex_cap'] = current.flex
current.loc[z,'new_slow_cap'] = current.slow
del current['cum_sum']
print(current)
# last loop where units are actually scaled in each country:
for z in zones:
u_idx = units.loc[flex_units,:][units.loc[flex_units,:].Zone==z].index.tolist()
SimData = adjust_unit_capacity(SimData, u_idx, scaling=current.loc[z,'new_flex_cap']/current.loc[z,'flex'], singleunit=singleunit)
u_idx = units.loc[slow_units,:][units.loc[slow_units,:].Zone==z].index.tolist()
SimData = adjust_unit_capacity(SimData, u_idx, scaling=current.loc[z,'new_slow_cap']/current.loc[z,'slow'], singleunit=singleunit)
# Checking
units_new = SimData['units']
# current situation for all zones:"
new_flex_cap = units_new.PowerCapacity[flex_units].sum()
new_total_cap = new_flex_cap + units_new.PowerCapacity[slow_units].sum()
new_flex_ratio = new_flex_cap / new_total_cap
if (new_flex_ratio - flex_ratio) > 0.01:
logging.error('the new flexbility ratio (' + str(new_flex_ratio) + ') is not equal to the desired one: ' + str(flex_ratio))
if dest_path == '':
logging.info('Not writing any input data to the disk')
else:
if not os.path.isdir(dest_path):
shutil.copytree(path, dest_path)
logging.info('Created simulation environment directory ' + dest_path)
logging.info('Writing input files to ' + dest_path)
with open(os.path.join(dest_path, 'Inputs.p'), 'wb') as pfile:
pickle.dump(SimData, pfile, protocol=pickle.HIGHEST_PROTOCOL)
if write_gdx:
write_variables(SimData['config'], 'Inputs.gdx', [SimData['sets'], SimData['parameters']])
shutil.copy('Inputs.gdx', dest_path + '/')
os.remove('Inputs.gdx')
return SimData
[docs]def adjust_ntc(inputs, value=None, write_gdx=False, dest_path=''):
"""
Function used to modify the net transfer capacities in the Dispa-SET generated input data
The function update the Inputs.p file in the simulation directory at each call
:param inputs: Input data dictionary OR path to the simulation directory containing Inputs.p
:param value: Absolute value of the desired capacity (! Applied only if scaling != 1 !)
:param write_gdx: boolean defining if Inputs.gdx should be also overwritten with the new data
:param dest_path: Simulation environment path to write the new input data. If unspecified, no data is written!
:return: New SimData dictionary
"""
import pickle
if isinstance(inputs, str):
path = inputs
inputfile = path + '/Inputs.p'
if not os.path.exists(path):
sys.exit('Path + "' + path + '" not found')
with open(inputfile, 'rb') as f:
SimData = pickle.load(f)
elif isinstance(inputs, dict):
SimData = inputs
path = SimData['config']['SimulationDirectory']
else:
logging.error('The input data must be either a dictionary or string containing a valid directory')
sys.exit(1)
if value is not None:
SimData['parameters']['FlowMaximum']['val']=SimData['parameters']['FlowMaximum']['val']*value
else:
pass
if dest_path == '':
logging.info('Not writing any input data to the disk')
else:
if not os.path.isdir(dest_path):
shutil.copytree(path, dest_path)
logging.info('Created simulation environment directory ' + dest_path)
logging.info('Writing input files to ' + dest_path)
with open(os.path.join(dest_path, 'Inputs.p'), 'wb') as pfile:
pickle.dump(SimData, pfile, protocol=pickle.HIGHEST_PROTOCOL)
if write_gdx:
write_variables(SimData['config'], 'Inputs.gdx', [SimData['sets'], SimData['parameters']])
shutil.copy('Inputs.gdx', dest_path + '/')
os.remove('Inputs.gdx')
return SimData