Power Factor and Solar Disaggreation¶
Background¶
Motivation¶
Many regions, especially California, are seeing growing quantities of distributed solar generation, often in the form of PV panels installed on the rooftops of buildings and homes. These installations can inject power back into the electric grid through distribution networks, challenging the traditional uni-directional and centralized model of power systems. Even if a utility knows the size of PV installations on a particular distribution feeder, they have no real time visibility into the output of these PV units. Instead, all they can see is the net load, which is the difference between the full demand and PV output. This lack of visibility can result in a number of operational problems, including a reduced ability to predict rapid changes in net load arising from, say, a passing cloud that diminishes PV output.
Solar disaggregation techniques aim to solve this issue by estimating the full demand and PV output from measurements of net load. Disaggregation can occur at the scale of a home, an entire distribution feeder, or a full interconnection.
We will perform solar disaggregation for the PV feeder in the sunshine dataset.
As a reference, the topology of this data set is the following: img
Power Factor¶
We will express $P_1$, the power output of the PV, as a factor of $P$, the real power demand of the feeder: $$P_1 = fP$$
The net real power load measured at PMU 3 is $P - P_1 = (1 - f)P$. Our disaggregation algorithm will estimate the factor $f$. Given $\theta$ and a measurement of $\theta'$ (the power angles with and without PV output, which could be substituted with power factors $pf$ and $pf'$ we can solve for $f$ as follows.
First, we convert from power factor to the ratio between real and reactive power, denoted $\tau$: $$ \begin{align*} \tau &\equiv \tan(\theta) = \tan(\cos^{-1}(pf)) = \frac{Q}{P} \\ \tau' &= \tan(\theta') = \tan(\cos^{-1}(pf')) = \frac{Q}{(1 - f)P} \\ \therefore \tau' &= \frac{\tau P}{(1 - f)P} = \frac{\tau}{1 - f} \end{align*} $$
Then we solve for $f$:
$$ \Rightarrow f = 1 - \frac{\tau}{\tau'} $$
To obtain $\theta$ or $pf$ ---which correspond to the aggregate feeder demand and are assumed constant--- we can look at the power flow into the feeder during nighttime hours, when PV output is zero. Using this constant value, we can then determine $f$ at various time points during daytime hours from measurements of the power factor or $\theta'$ at those times.
We apply this approach to data from PMU 3. We set $pf$ to a constant value equal to the average of the measured power factor at night over several days. Then, given the measured power factor at some time in the day, we use the equations above to compute $f$. The results are plotted in Fig. 3, visualized just as power factors were visualized earlier.
from datetime import datetime
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
import seaborn as sns
from matplotlib.dates import DateFormatter
from tqdm.notebook import tqdm
import pingthings.timeseries as pt
sns.set_style("darkgrid")
conn = pt.connect()
print(f"{conn.info() = }")
conn.info() = {'major_version': 5, 'minor_version': 34, 'build': '5.34.0', 'proxy': {'proxy_endpoints': []}}
prefix = "sunshine"
angle_streams = conn.streams_in_collection(prefix, is_collection_prefix=True, tags={"unit": "deg"})
vmag_streams = conn.streams_in_collection(prefix, is_collection_prefix=True, tags={"unit": "volts"})
cmag_streams = conn.streams_in_collection(prefix, is_collection_prefix=True, tags={"unit": "amps"})
print(f"{len(angle_streams) = }")
print(f"{len(vmag_streams) = }")
print(f"{len(cmag_streams) = }")
len(angle_streams) = 36 len(vmag_streams) = 18 len(cmag_streams) = 18
# extract only A-phase for voltage and current
phase_a = "1"
curr = "C"
volt = "L"
ang = "ANG"
mag = "MAG"
# we only care about PMUs 1,3,4,5
# C1ANG
curr_angles = pt.StreamSet(
    sorted(
        [
            s
            for s in angle_streams
            if s.name == f"{curr}{phase_a}{ang}"
            and "6" not in s.collection
            and "2" not in s.collection
        ],
        key=lambda x: x.collection,
    )
)
# L1ANG
volt_angles = pt.StreamSet(
    sorted(
        [
            s
            for s in angle_streams
            if s.name == f"{volt}{phase_a}{ang}"
            and "6" not in s.collection
            and "2" not in s.collection
        ],
        key=lambda x: x.collection,
    )
)
volt_mags = pt.StreamSet(
    sorted(
        [
            s
            for s in vmag_streams
            if s.name == f"{volt}{phase_a}{mag}"
            and "6" not in s.collection
            and "2" not in s.collection
        ],
        key=lambda x: x.collection,
    )
)
curr_mags = pt.StreamSet(
    sorted(
        [
            s
            for s in cmag_streams
            if s.name == f"{curr}{phase_a}{mag}"
            and "6" not in s.collection
            and "2" not in s.collection
        ],
        key=lambda x: x.collection,
    )
)
for stream_set in (curr_angles, volt_angles, curr_mags, volt_mags):
    print([f"{stream.collection}.{stream.name}" for stream in stream_set])
['sunshine/PMU1.C1ANG', 'sunshine/PMU3.C1ANG', 'sunshine/PMU4.C1ANG', 'sunshine/PMU5.C1ANG'] ['sunshine/PMU1.L1ANG', 'sunshine/PMU3.L1ANG', 'sunshine/PMU4.L1ANG', 'sunshine/PMU5.L1ANG'] ['sunshine/PMU1.C1MAG', 'sunshine/PMU3.C1MAG', 'sunshine/PMU4.C1MAG', 'sunshine/PMU5.C1MAG'] ['sunshine/PMU1.L1MAG', 'sunshine/PMU3.L1MAG', 'sunshine/PMU4.L1MAG', 'sunshine/PMU5.L1MAG']
def fetch_and_process_chunk(v_stream, c_stream, start, end, first_x_minutes, v_uuid, c_uuid):
    streams = pt.StreamSet([v_stream, c_stream])
    tab = streams.raw_values(start=start, end=end)
    # Convert the 'time' column to naive timestamps (UTC)
    time_column = tab["time"]
    time_naive = pc.cast(time_column, pa.int64())
    # Adjust the time by subtracting 8 hours (8 hours * 3600 seconds/hour * 10^9 nanoseconds/second)
    time_adjusted = pc.subtract(time_naive, pa.scalar(pt.utils.ns_delta(hours=8), pa.int64()))
    # Add the new timezone information (UTC-8)
    time_utc_minus_8 = pc.cast(time_adjusted, pa.timestamp("ns", "UTC"))
    # Replace the 'time' column in the table with the converted column
    tab = tab.set_column(0, "time", time_utc_minus_8)
    results = []
    if tab:
        filtered_tab = pc.drop_null(tab)
        minutes = pc.minute(filtered_tab["time"])
        # Filter rows where the minute is less_equal first_x_minutes-1 (i.e., the first minute of each hour would be 0)
        filter_mask = pc.less_equal(minutes, pa.scalar(first_x_minutes - 1, pa.int64()))
        filtered_data = filtered_tab.filter(filter_mask)
        filtered_data = filtered_data.append_column("hour", pc.hour(filtered_data["time"]))
        wrapped_vang = filtered_data[v_uuid].to_numpy()
        wrapped_vang_rad = np.deg2rad(wrapped_vang)
        wrapped_vang_rad = np.arctan2(np.sin(wrapped_vang_rad), np.cos(wrapped_vang_rad))
        wrapped_cang = filtered_data[c_uuid].to_numpy()
        wrapped_cang_rad = np.deg2rad(wrapped_cang)
        wrapped_cang_rad = np.arctan2(np.sin(wrapped_cang_rad), np.cos(wrapped_cang_rad))
        filtered_data = filtered_data.append_column("wrapped_vang", pa.array(wrapped_vang_rad))
        filtered_data = filtered_data.append_column("wrapped_cang", pa.array(wrapped_cang_rad))
        filtered_data = filtered_data.append_column(
            "pf",
            pc.cos(pc.subtract(filtered_data["wrapped_vang"], filtered_data["wrapped_cang"])),
        )
        grouped = filtered_data.group_by("hour").aggregate([("pf", "mean")])
        day = pc.unique(pc.day_of_week(filtered_data["time"]))[0].as_py()
        for hour_batch in grouped.to_batches():
            table = pa.table(hour_batch)
            for hour, pf_mean in zip(table["hour"], table["pf_mean"]):
                results.append((day, hour.as_py(), pf_mean.as_py()))
        return results
    return None
def analyze_per_unit_time_power_factor(
    v_stream: pt.Stream,
    c_stream: pt.Stream,
    overall_start: int,
    overall_end: int,
    sample_freq: str = "1D",
    first_x_minutes: int = 1,
):
    time_range = pd.date_range(start=overall_start, end=overall_end, freq=sample_freq)
    result = []
    v_uuid = str(v_stream.uuid)
    c_uuid = str(c_stream.uuid)
    # for s,e in tqdm(zip(time_range[:-1], time_range[1:]), total=len(time_range[1:])):
    #     tmp = fetch_and_process_chunk(v_stream, c_stream, s.value, e.value, first_x_minutes, v_uuid, c_uuid)
    #     if tmp:
    #         result.extend(tmp)
    #     else:
    #         continue
    for s, e in tqdm(zip(time_range[:-1], time_range[1:]), total=len(time_range[1:])):
        tmp = fetch_and_process_chunk(
            v_stream, c_stream, s.value, e.value, first_x_minutes, v_uuid, c_uuid
        )
        if tmp:
            result.extend(tmp)
        else:
            continue
    return result
# use the year of 2016
start = pt.utils.to_nanoseconds("2016-04-01")
end = pt.utils.to_nanoseconds("2016-05-01")
freq = "1D"
sample_amount = pt.utils.ns_delta(minutes=5)
pq_files = list(Path(".").glob("*.parquet"))
if pq_files:
    print("Pre-computed files found! Loading dataframes from parquet.")
    df_dict = {p.stem: pd.read_parquet(p) for p in pq_files}
else:
    for vang, cang in zip(volt_angles, curr_angles):
        tmp = analyze_per_unit_time_power_factor(
            v_stream=vang,
            c_stream=cang,
            overall_start=start,
            overall_end=end,
            sample_freq=freq,
            first_x_minutes=5,
        )
        weekday_map = {
            0: "Monday",
            1: "Tuesday",
            2: "Wednesday",
            3: "Thursday",
            4: "Friday",
            5: "Saturday",
            6: "Sunday",
        }
        pf_df = pd.DataFrame(data=tmp, columns=["day", "hour", "pf"])
        pf_df["weekday"] = pf_df["day"].map(weekday_map)
        pf_df.drop(columns=["day"], inplace=True)
        pf_df = pf_df.astype(dtype={"weekday": "category", "hour": int, "pf": float})
        # pf_df.head()
        # pf_df.head()
        agg_pf_df = pf_df.groupby(["weekday", "hour"]).agg("mean")
        # agg_pf_df
        agg_pf_df.to_parquet(f"{vang.collection.replace('/','_')}.parquet")
    pq_files = list(Path(".").glob("*.parquet"))
    df_dict = {p.stem: pd.read_parquet(p) for p in pq_files}
0%| | 0/30 [00:00<?, ?it/s]
/tmp/ipykernel_300451/2831619524.py:31: FutureWarning: The default of observed=False is deprecated and will be changed to True in a future version of pandas. Pass observed=False to retain current behavior or observed=True to adopt the future default and silence this warning.
  agg_pf_df = pf_df.groupby(["weekday", "hour"]).agg("mean")
0%| | 0/30 [00:00<?, ?it/s]
/tmp/ipykernel_300451/2831619524.py:31: FutureWarning: The default of observed=False is deprecated and will be changed to True in a future version of pandas. Pass observed=False to retain current behavior or observed=True to adopt the future default and silence this warning.
  agg_pf_df = pf_df.groupby(["weekday", "hour"]).agg("mean")
0%| | 0/30 [00:00<?, ?it/s]
/tmp/ipykernel_300451/2831619524.py:31: FutureWarning: The default of observed=False is deprecated and will be changed to True in a future version of pandas. Pass observed=False to retain current behavior or observed=True to adopt the future default and silence this warning.
  agg_pf_df = pf_df.groupby(["weekday", "hour"]).agg("mean")
0%| | 0/30 [00:00<?, ?it/s]
/tmp/ipykernel_300451/2831619524.py:31: FutureWarning: The default of observed=False is deprecated and will be changed to True in a future version of pandas. Pass observed=False to retain current behavior or observed=True to adopt the future default and silence this warning.
  agg_pf_df = pf_df.groupby(["weekday", "hour"]).agg("mean")
def process_pf_df(df: pd.DataFrame) -> pd.DataFrame:
    # Reset the index to convert MultiIndex to columns
    df_reset = df.reset_index()
    # Define the specific order for the weekday column, categorical values can have a manually defined order
    weekday_order = [
        "Monday",
        "Tuesday",
        "Wednesday",
        "Thursday",
        "Friday",
        "Saturday",
        "Sunday",
    ]
    df_reset["weekday"] = pd.Categorical(
        df_reset["weekday"], categories=weekday_order, ordered=True
    )
    weekday_to_date = {day: idx for idx, day in enumerate(weekday_order)}
    # Combine weekday and hour into a single datetime object for plotting, year is required, but unnecessary for our plot
    df_reset["datetime"] = df_reset.apply(
        lambda row: datetime(2017, 1, weekday_to_date[row["weekday"]] + 2, row["hour"]),
        axis=1,
    )
    return df_reset
n_rows = len(pq_files)
fig, ax = plt.subplots(nrows=n_rows, ncols=1, figsize=(16, 9), sharex=True)
keys = list(df_dict.keys())
keys.sort(key=lambda x: x.split("_")[1])
for i, key in enumerate(keys):
    df = df_dict[key]
    print(key)
    proc_df = process_pf_df(df).copy()
    rolling_med = proc_df.rolling(2).median(numeric_only=True)
    # Create the scatter plot
    g1 = sns.lineplot(
        data=rolling_med,
        x=proc_df["datetime"],
        y="pf",
        alpha=1,
        ax=ax[i],
        label="median",
    )
    g2 = sns.scatterplot(data=proc_df, x="datetime", y="pf", hue="weekday", ax=ax[i])
    g1._remove_legend(g1.get_legend())
    g2._remove_legend(g2.get_legend())
    # Set the labels and title
    ax[i].set_xlabel("Datetime")
    ax[i].set_ylabel("Power Factor (pf)[-]")
    ax[i].set_title(f"Hourly Power Factor: {key.split('_')[1]}")
    # Format the x-axis to show both the day and hour
    date_form = DateFormatter("%a %H:%M")
    minor_form = DateFormatter("%H")
    ax[i].xaxis.set_major_formatter(date_form)
    ax[i].xaxis.set_minor_formatter(minor_form)
    plt.minorticks_on()
    plt.grid(visible=True, which="major")
    # Adjust the x-axis labels for better readability
    plt.xticks(rotation=45)
plt.show()
sunshine_PMU1 sunshine_PMU3 sunshine_PMU4 sunshine_PMU5
Power factor samples at PMU 3 measures the power flow at the interconnection between the bulk grid and a distribution feeder with a significant PV installation (PMU1).
Here we see a dramatic daily trend, with the power factor flipping from +1 during the dark hours to −1 during daylight hours. If you have an inkling that this pattern arises from the solar generation, you’re correct! The absolute value of the power factor consistently near 1 indicates that the power flow at the interconnection is always predominantly real, with almost no reactive power exchange. The change in sign of the power factor indicates a change in the direction of power flow. At night, when the power factor is +1, power is flowing into the feeder to serve the load.
Baseline Power Factor from PMU3 at night (no PV input)¶
pmu3_df = df_dict["sunshine_PMU3"].reset_index()
pmu3_df
| weekday | hour | pf | |
|---|---|---|---|
| 0 | Friday | 0 | 0.995753 | 
| 1 | Friday | 1 | 0.995672 | 
| 2 | Friday | 2 | 0.994561 | 
| 3 | Friday | 3 | 0.995534 | 
| 4 | Friday | 4 | 0.994536 | 
| ... | ... | ... | ... | 
| 163 | Wednesday | 19 | 0.995417 | 
| 164 | Wednesday | 20 | 0.996655 | 
| 165 | Wednesday | 21 | 0.999151 | 
| 166 | Wednesday | 22 | 0.998742 | 
| 167 | Wednesday | 23 | 0.997478 | 
168 rows × 3 columns
pmu3_pf = pmu3_df[(pmu3_df["hour"] < 4) | (pmu3_df["hour"] > 22)]["pf"].mean()
pmu3_tau = np.tan(np.arccos(pmu3_pf))
pmu3_df["tau_prime"] = np.tan(np.arccos(pmu3_df["pf"]))
pmu3_df["disagg"] = 1 - (pmu3_tau / pmu3_df["tau_prime"])
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16, 9))
proc_df = process_pf_df(pmu3_df)
rolling_med = proc_df.rolling(2).median(numeric_only=True)
# Create the scatter plot
g1 = sns.lineplot(
    data=rolling_med,
    x=proc_df["datetime"],
    y="disagg",
    alpha=1,
    ax=ax,
    label="median",
)
g2 = sns.scatterplot(data=proc_df, x="datetime", y="disagg", hue="weekday", ax=ax)
g1._remove_legend(g1.get_legend())
g2._remove_legend(g2.get_legend())
# Set the labels and title
ax.set_xlabel("Datetime")
ax.set_ylabel("PV Generator Factor (f)[-]")
ax.set_title("Disaggregation using Power Factor")
# Format the x-axis to show both the day and hour
date_form = DateFormatter("%a %H:%M")
minor_form = DateFormatter("%H")
ax.xaxis.set_major_formatter(date_form)
ax.xaxis.set_minor_formatter(minor_form)
plt.minorticks_on()
# plt.ylim([-0.5,4])
plt.grid(visible=True, which="major")
# Adjust the x-axis labels for better readability
plt.xticks(rotation=45)
plt.show()
A value of $f=1$ implies that the power output of the PV exactly matches the real power demand on the feeder. This will result in zero real power flow from the bulk grid to the feeder through PMU 3. When $f<1$, the PV output is insufficient to meet feeder demand, and the shortfall will be drawn from the bulk grid. When $f>1$, there is a surplus of PV generation, and the excess will flow from the feeder into the main grid, resulting in reverse power flow.
We see that during the peak PV output hours, $f$ often exceeds $1$, resulting in reverse power flow. There is a distinctive weekly trend, with $f$ being significantly larger on weekends, not because of a major change in PV output but presumably due to a marked drop in demand.