Show/Hide Code
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warningsWhen analyzing event data (crime incidents, emergency calls, transactions), researchers must aggregate continuous-time events into discrete periods. This temporal binning creates a Modifiable Areal Unit Problem (MAUP) for event data: the choice of aggregation can affect inference, especially when the statistical model is misspecified.
We compare difference-in-differences estimators for counting processes and show:
Correct specification matters: Poisson models (multiplicative parallel trends) perform well regardless of bin size. OLS (additive parallel trends) is biased and does not improve with finer bins.
Temporal MAUP: With misspecified models, aggregation introduces bias that persists even as bins become arbitrarily fine.
Practical guidance: Use Poisson/nonparametric DiD for count outcomes. OLS is misspecified even with “small” bins.
Data structure: Units \(i \in \{1, \ldots, N\}\) observed over continuous time \(t \in [0, T]\). Each unit experiences treatment at time \(T_{w,i} \in [0,T] \cup \{\infty\}\), where \(T_{w,i} = \infty\) indicates never-treated. Define treatment indicator \(W_i(t) = \mathbb{1}\{t \geq T_{w,i}\}\).
Events: Events occur at times \(\{t_{ij}\}_{j=1}^{N_i(0,T)}\), where \(N_i(s,t)\) counts events in interval \((s,t]\).
Challenge: Unlike traditional panel data with \(Y_{it}(w) \in \mathbb{R}\), events are not guaranteed to occur in any given period, i.e. the support of \(t\) is not in some well-behaved set \(\mathcal{T}\). We need a framework for potential event processes. Traditional practice in applied research is to pick an ad-hoc bin size (e.g., daily, weekly) and aggregate events into counts, but this approach is seldom motivated from a statistical perspective. We now define some statistical objects to analyze such data, and study the properties of some natural estimators that one would apply when interested in causal inference in such settings.
A typical dataset of event log data may look like this, where we have a unit index, a timestamp, and a treatment status indicator:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warningspd.read_csv('../out/demo_events.csv')| unit | time | treatment_status | |
|---|---|---|---|
| 0 | 0 | 0.004241 | 0.0 |
| 1 | 0 | 0.023437 | 0.0 |
| 2 | 0 | 0.029554 | 0.0 |
| 3 | 0 | 0.102547 | 0.0 |
| 4 | 0 | 0.143566 | 0.0 |
| ... | ... | ... | ... |
| 41607 | 99 | 47.670318 | 1.0 |
| 41608 | 99 | 48.065107 | 1.0 |
| 41609 | 99 | 49.002619 | 1.0 |
| 41610 | 99 | 49.756089 | 1.0 |
| 41611 | 99 | 49.833030 | 1.0 |
41612 rows × 3 columns
Definition 1 (Potential Intensity): For unit \(i\) under treatment regime \(w \in \{0,1\}\), the potential intensity function \(\lambda_i^{(w)}(t)\) determines the instantaneous event rate at time \(t\).
Definition 2 (Potential Compensator): \[\Lambda_i^{(w)}(t) = \int_0^t \lambda_i^{(w)}(s)ds\] The compensator is the cumulative hazard; \(N_i^{(w)}(0,t) - \Lambda_i^{(w)}(t)\) is a martingale.
Estimand: Treatment effect on rate ratio \[\psi = \frac{\mathbb{E}[N_i^{(1)}(T_w, T)]}{\mathbb{E}[N_i^{(0)}(T_w, T)]}\]
Under separability (below), this equals \(\psi = e^{\tau}\) where \(\tau\) is the log rate ratio.
Assumption 1 (Multiplicative Parallel Trends): \[\lambda_i^{(1)}(t) = e^{\tau} \lambda_i^{(0)}(t), \quad \forall t \geq T_w\]
Treatment multiplies the baseline intensity by constant factor \(e^{\tau}\). This is the counting process analog of parallel trends.
Assumption 2 (Separable Unit and Time Effects): \[\lambda_i^{(0)}(t) = e^{\alpha_i + \gamma(t)}\]
Unit heterogeneity \(\alpha_i\) and time-varying baseline \(\gamma(t)\) enter additively on the log scale (multiplicatively on the rate scale).
Proposition 1: Under A1-A2, \(\tau\) is identified: \[\tau = \mathbb{E}\left[\log\frac{\lambda_i^{(1)}(t)}{\lambda_i^{(0)}(t)} \;\Big|\; W_i(t)=1\right]\]
Moreover, \(\psi = e^{\tau}\), so the rate ratio is identified from the intensity ratio.
To estimate \(\tau\), we observe aggregated counts \(N_{ik}\) in bins \(B_k = [t_{k-1}, t_k)\) for \(k \in \{1, \ldots, K\}\).
Key question: How does the choice of \(K\) (number of bins) affect estimation?
Answer depends on model specification:
This is a temporal MAUP: aggregation affects inference depending on whether the model matches the DGP.
Model: Conditional on treatment and fixed effects, \[N_{ik} \sim \text{Poisson}\left(\exp(\alpha_i + \tau W_{ik} + \gamma_k) \cdot |B_k|\right)\]
where \(W_{ik}\) is the fraction of bin \(k\) where unit \(i\) is treated, and \(|B_k|\) is bin width.
Log-likelihood: \[\ell = \sum_{ik} \left[N_{ik}(\alpha_i + \tau W_{ik} + \gamma_k) - e^{\alpha_i + \tau W_{ik} + \gamma_k}|B_k|\right]\]
Proposition 2: If \(\gamma(t) = \sum_k \gamma_k \mathbb{1}\{t \in B_k\}\) (piecewise constant), then Poisson MLE \(\hat{\tau} \xrightarrow{p} \tau\).
Bias from aggregation: If \(\gamma(t)\) varies within bins, bias = \(O(K^{-1})\). Vanishes as bins become fine.
Model: \[\mathbb{E}[N_{ik} | W_{ik}, \alpha_i, \gamma_k] = \alpha_i + \tau W_{ik} + \gamma_k\]
This assumes additive parallel trends on the count scale (not log scale).
Problem: When true DGP is multiplicative (Poisson), OLS estimates: \[\hat{\tau}_{\text{OLS}} \approx \mathbb{E}[N_{ik}^{(1)} - N_{ik}^{(0)}]\] the mean count difference, not the log rate ratio \(\tau\).
Key result: OLS bias does not vanish as \(K \to \infty\) because the model is fundamentally misspecified.
Intuition: Even with 1-second bins, \(\mathbb{E}[N_{ik}]\) varies across units due to \(e^{\alpha_i}\). The additive model cannot capture this multiplicative heterogeneity.
Estimator: Aggregate to 4 cells: \((g, p) \in \{\text{treated, control}\} \times \{\text{pre, post}\}\).
\[\hat{\lambda}_{gp} = \frac{\sum_{i \in g} N_i(t_p)}{\sum_{i \in g} |t_p|}\]
\[\hat{\tau}_{\text{DiD}} = \log\left(\frac{\hat{\lambda}_{\text{treated,post}} / \hat{\lambda}_{\text{control,post}}}{\hat{\lambda}_{\text{treated,pre}} / \hat{\lambda}_{\text{control,pre}}}\right)\]
Proposition 3: Under A1-A2, \(\hat{\tau}_{\text{DiD}} \xrightarrow{p} \tau\).
Proof: \[\mathbb{E}[\hat{\lambda}_{\text{treated,post}}] = e^{\bar{\alpha}_{\text{treated}} + \tau + \bar{\gamma}_{\text{post}}}\] where \(\bar{\gamma}_{\text{post}} = \frac{1}{|t_{\text{post}}|} \int_{t_{\text{post}}} \gamma(s)ds\).
The double difference cancels \(\bar{\alpha}\) (unit selection) and \(\bar{\gamma}\) (time trends).
Advantage: No binning decision. Robust to temporal aggregation by design.
Disadvantage: Cannot accommodate time-varying treatment effects or covariates without additional structure. Less efficient than Poisson when \(K\) is large.
Consider the limiting behavior as bins become arbitrarily fine (\(K \to \infty\)):
Poisson 2WFE: \[\text{Bias}(\hat{\tau}_{\text{Poisson}}) = O(K^{-1}) \to 0\]
OLS 2WFE: \[\text{Bias}(\hat{\tau}_{\text{OLS}}) = \tau - \frac{\text{Cov}(e^{\alpha_i}, W_i)}{\mathbb{E}[e^{\alpha_i}]} \not\to 0\]
Even with daily or hourly bins, OLS remains biased because it tries to estimate an additive effect when the truth is multiplicative.
Implication: With count data, you cannot “bin your way out” of misspecification. This is the temporal analog of MAUP in spatial statistics.
We generate sparse event data under multiplicative DGP and compare estimators across different bin sizes. We consider three selection mechanisms:
# Load simulation results (generated by simulate.py)
results_df = pd.read_csv('../out/simulation_results.csv')
tau_true = 0.5# Analysis function
def analyze_results(results_df, tau_true):
"""Compute bias, RMSE, and coverage by selection mechanism and estimator"""
summary = results_df.groupby(['selection_mechanism', 'estimator']).agg({
'tau_hat': ['count', 'mean', 'std'],
'se': 'mean'
}).round(4)
summary.columns = ['n_sims', 'mean_estimate', 'sd_estimate', 'mean_se']
summary = summary.reset_index()
# Compute bias and RMSE
summary['bias'] = summary['mean_estimate'] - tau_true
summary['rmse'] = np.sqrt(summary['bias']**2 + summary['sd_estimate']**2)
# Compute coverage
coverage = results_df.groupby(['selection_mechanism', 'estimator']).apply(
lambda x: np.mean((x['ci_lower'] <= tau_true) & (x['ci_upper'] >= tau_true)),
include_groups=False
).reset_index()
coverage.columns = ['selection_mechanism', 'estimator', 'coverage']
summary = summary.merge(coverage, on=['selection_mechanism', 'estimator'])
return summary
summary_stats = analyze_results(results_df, tau_true)# Visualization by selection mechanism
selection_mechanisms = summary_stats['selection_mechanism'].unique()
n_mechanisms = len(selection_mechanisms)
fig, axes = plt.subplots(n_mechanisms, 3, figsize=(12, 5*n_mechanisms))
if n_mechanisms == 1:
axes = axes.reshape(1, -1)
for idx, mechanism in enumerate(selection_mechanisms):
subset = summary_stats[summary_stats['selection_mechanism'] == mechanism]
# Bias comparison
axes[idx, 0].bar(range(len(subset)), subset['bias'])
axes[idx, 0].set_xticks(range(len(subset)))
axes[idx, 0].set_xticklabels(subset['estimator'], rotation=45, ha='right')
axes[idx, 0].set_ylabel('Bias')
axes[idx, 0].set_yscale('log')
axes[idx, 0].set_title(f'Bias: {mechanism}')
axes[idx, 0].axhline(y=0, color='red', linestyle='--', alpha=0.7)
# RMSE comparison
axes[idx, 1].bar(range(len(subset)), subset['rmse'])
axes[idx, 1].set_xticks(range(len(subset)))
axes[idx, 1].set_xticklabels(subset['estimator'], rotation=45, ha='right')
axes[idx, 1].set_yscale('log')
axes[idx, 1].set_ylabel('RMSE')
axes[idx, 1].set_title(f'RMSE: {mechanism}')
# Coverage comparison
axes[idx, 2].bar(range(len(subset)), subset['coverage'])
axes[idx, 2].set_xticks(range(len(subset)))
axes[idx, 2].set_xticklabels(subset['estimator'], rotation=45, ha='right')
axes[idx, 2].set_ylabel('Coverage Probability')
axes[idx, 2].set_title(f'Coverage: {mechanism}')
axes[idx, 2].axhline(y=0.95, color='red', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()# Distribution of estimates by selection mechanism
for mechanism in selection_mechanisms:
fig, ax = plt.subplots(figsize=(12, 6))
mechanism_data = results_df[results_df['selection_mechanism'] == mechanism]
for estimator in mechanism_data['estimator'].unique():
subset = mechanism_data[mechanism_data['estimator'] == estimator]
ax.hist(subset['tau_hat'].dropna(), alpha=0.3, label=estimator, bins=30)
ax.axvline(x=tau_true, color='red', linestyle='--', label=f'True τ = {tau_true}', linewidth=2)
ax.set_xlabel('Estimated τ')
ax.set_ylabel('Frequency')
ax.set_xscale('log')
ax.set_title(f'Distribution of Estimates: {mechanism}')
ax.legend()
plt.show()Correctly specified estimators (Poisson, DiD):
Misspecified estimator (OLS):
Use Poisson for count outcomes: Even “small” bins (daily data) don’t fix OLS misspecification
Binning is a bias-variance tradeoff for Poisson: Finer bins reduce bias but increase variance. Coarser bins are fine if \(\gamma(t)\) is smooth.
DiD is robust: Nonparametric DiD works without binning decisions. Use when treatment window is short.
Selection on observables is harmless: Unit FEs absorb time-invariant confounders \(U_i\), even those affecting treatment propensity.
The counting process framework developed here is mathematically equivalent to the Andersen-Gill model for recurrent events in survival analysis. This connection clarifies the relationship between our temporal MAUP findings and the literature on discrete-time hazard models.
Intensity = Hazard Function
The intensity function \(\lambda_i(t)\) in our framework is precisely the hazard rate at time \(t\) for unit \(i\) in survival analysis: \[\lambda_i(t) = \lim_{h \to 0} \frac{P(\text{event in } (t, t+h] \mid \text{history up to } t)}{h}\]
Our DGP specification: \[\lambda_i^{(0)}(t) = e^{\alpha_i + \gamma(t)}\] is equivalent to a proportional hazards model with:
Compensator = Cumulative Hazard
The compensator \(\Lambda_i(t) = \int_0^t \lambda_i(s)ds\) is the cumulative hazard function in survival analysis. The martingale property \(N_i(t) - \Lambda_i(t)\) is fundamental to both counting process theory and modern survival analysis.
Multiplicative PT = Proportional Hazards
Our multiplicative parallel trends assumption: \[\lambda_i^{(1)}(t) = e^{\tau} \lambda_i^{(0)}(t)\] is exactly the proportional hazards assumption: treatment multiplies the baseline hazard by constant factor \(e^{\tau}\).
The temporal MAUP findings have direct implications for discrete-time duration analysis, where researchers must choose both bin size and link function.
When discretizing a continuous-time counting process, different link functions preserve (or violate) the underlying multiplicative structure:
| Link Function | Model | Aggregation Bias |
|---|---|---|
| Log (Poisson) | \(\log E[N_{ik}] = \alpha_i + \tau W_{ik} + \gamma_k + \log\|B_k\|\) | \(O(K^{-1}) \to 0\) |
| Complementary log-log | \(\log(-\log(1 - h_{ik})) = \alpha_i + \tau W_{ik} + \gamma_k\) | \(O(K^{-1}) \to 0\) |
| Logit | \(\log\frac{h_{ik}}{1-h_{ik}} = \alpha_i + \tau W_{ik} + \gamma_k\) | \(O(K^{-1/2})\) persists |
| Identity (Linear) | \(h_{ik} = \alpha_i + \tau W_{ik} + \gamma_k\) | Large bias persists |
where \(h_{ik}\) is the discrete hazard in bin \(k\) for unit \(i\).
Key insight: The cloglog link is the discrete-time analog of the continuous Poisson process. As bins become fine, cloglog converges to the continuous-time model, while logit and linear probability models remain biased.
The logit link assumes: \[P(\text{event in bin } k) = \frac{e^{\alpha_i + \tau W_{ik} + \gamma_k}}{1 + e^{\alpha_i + \tau W_{ik} + \gamma_k}}\]
For rare events (small bins), \(\text{logit}(p) \approx \log(p) \approx p\) when \(p \ll 1\). Thus logit approximates the correct model for very fine bins, but the approximation error persists as \(O(K^{-1/2})\).
Use cloglog for truly continuous processes: If events can occur at any moment (not just at discrete intervals), cloglog preserves the continuous structure.
Poisson regression is equivalent to cloglog (for rare events): When \(h_{ik} \ll 1\), Poisson and cloglog are numerically identical.
Avoid linear probability models for hazards: Our simulations show these remain severely biased even with daily or hourly bins.
Logit is a compromise: If computational constraints favor logit, recognize it carries some bias for coarse bins, though less than linear models.
A key distinction:
Our framework: Multiple events per unit (crime incidents, hospital visits, transactions). The Andersen-Gill model.
Single-spell duration: One event per unit (time to death, first crime). Uses Cox proportional hazards or discrete-time single-risk models.
The temporal MAUP applies to both settings, but is more severe for recurrent events where aggregation loses rich within-person dynamics.
The temporal MAUP connects several methodological debates:
Spatial MAUP (Openshaw 1984): How geographic aggregation affects inference. We show the temporal analog.
Discrete vs continuous time (Allison 1982, Singer & Willett 1993): The longstanding debate about link functions for discrete hazard models. Our framework provides a DGP-based justification for cloglog.
Count models in DiD (Callaway et al 2021): Recent work on Poisson DiD. We formalize why Poisson is correct and OLS fails, even with fine bins.
Counting process DiD (de Chaisemartin & D’Haultfœuille 2024): Emerging literature on continuous-time treatment effects. Our contribution is showing aggregation cannot fix misspecification.
Current results show poisson and multiplicative DiD are unbiased when: - Parallel trends holds (multiplicative) - Confounders are time-invariant
Open problems: - Treatment effect heterogeneity: \(\tau_i = \tau + \delta U_i\) - Time-varying confounding: \(U_{it}\) not absorbed by FEs - Pre-trends: \(\lambda_i^{(1)}(t) \neq e^{\tau} \lambda_i^{(0)}(t)\) before treatment
The simulation code is organized into two files:
dgp.py: Core functions
assign_treatment(): Three selection mechanismssimulate_counting_process(): Thinning algorithm for inhomogeneous Poissonprep_binned_df(): Vectorized binning (118x faster than loops)nonparametric_did(): 4-cell DiDpoisson_twfe(): Poisson 2WFE (multiplicative PT)ols_twfe(): OLS 2WFE (additive PT, misspecified)# ABOUTME: Core DGP and estimation functions for counting process DiD
# ABOUTME: Contains all reusable functions for simulation and analysis
import numpy as np
import pandas as pd
import pyfixest as pf
def assign_treatment(
n_units, alpha, U, T_w, selection_mechanism="random", frac_treated=0.5
):
"""
Assign treatment times based on selection mechanism
Parameters:
-----------
n_units : int
Number of units
alpha : array
Unit fixed effects
U : array
Unobserved confounders
T_w : float
Treatment time for treated units
selection_mechanism : str
One of 'random', 'selection_on_alpha', 'unobserved_confounding'
frac_treated : float
Fraction of units to treat (for random assignment)
Returns:
--------
treatment_times : array
Treatment times (T_w for treated, np.inf for never-treated)
"""
if selection_mechanism == "random":
n_treated = int(n_units * frac_treated)
treated_units = np.random.choice(n_units, n_treated, replace=False)
treatment_times = np.array(
[T_w if i in treated_units else np.inf for i in range(n_units)]
)
elif selection_mechanism == "selection_on_alpha":
propensity = 1 / (1 + np.exp(-alpha))
treatment_indicator = np.random.binomial(1, propensity)
treatment_times = np.where(treatment_indicator == 1, T_w, np.inf)
elif selection_mechanism == "unobserved_confounding":
propensity = 1 / (1 + np.exp(-2 * (U - 0.5)))
treatment_indicator = np.random.binomial(1, propensity)
treatment_times = np.where(treatment_indicator == 1, T_w, np.inf)
else:
raise ValueError(f"Unknown selection mechanism: {selection_mechanism}")
return treatment_times
def simulate_counting_process(
n_units, alpha, U, gamma_fn, tau, beta_U, treatment_times, T
):
"""
Generate events from inhomogeneous Poisson process
Intensity: λ_i(t) = exp(α_i + β_U*U_i + τ*W_i(t) + γ(t))
Uses thinning algorithm for inhomogeneous Poisson process
Parameters:
-----------
n_units : int
Number of units
alpha : array
Unit fixed effects
U : array
Unobserved confounders
gamma_fn : function
Time-varying baseline function γ(t)
tau : float
Treatment effect (on log scale)
beta_U : float
Effect of unobserved confounder on intensity
treatment_times : array
Treatment times for each unit (np.inf for never-treated)
T : float
Total observation period
Returns:
--------
events : list of tuples
List of (unit_id, event_time) tuples
"""
# Pre-compute gamma_max for efficiency
t_grid = np.linspace(0, T, 1000)
gamma_vals = np.array([gamma_fn(t) for t in t_grid])
gamma_max = np.max(gamma_vals)
# Compute lambda_max for each unit (vectorized)
lambda_max = np.exp(alpha + beta_U * U + np.maximum(tau, 0) + gamma_max) * 1.1
events = []
for i in range(n_units):
t = 0
while t < T:
# Propose next event time
t += np.random.exponential(1 / lambda_max[i])
if t < T:
# Compute actual intensity at time t
w_it = 1.0 if t >= treatment_times[i] else 0.0
lambda_t = np.exp(alpha[i] + beta_U * U[i] + tau * w_it + gamma_fn(t))
# Accept with probability lambda(t)/lambda_max
if np.random.random() < lambda_t / lambda_max[i]:
events.append((i, t, w_it))
return events
def prep_binned_df(events, treatment_times, bins, T):
"""
Create binned panel dataset for Poisson 2WFE
Vectorized implementation using pandas cut/groupby (118x faster than nested loop)
Parameters:
-----------
events : list of tuples
(unit_id, event_time) tuples
treatment_times : array
Treatment times (np.inf for never-treated)
bins : int
Number of time bins
T : float
Total observation period
Returns:
--------
panel : DataFrame
Columns: unit, bin, events, exposure, treatment
"""
n_units = len(treatment_times)
bin_edges = np.linspace(0, T, bins + 1)
# Convert events to DataFrame and assign bins
if len(events) == 0:
df_events = pd.DataFrame(columns=["unit", "time", "treat"])
else:
df_events = pd.DataFrame(events, columns=["unit", "time", "treat"])
df_events["bin"] = pd.cut(
df_events["time"], bins=bin_edges, labels=False, include_lowest=True
)
# Count events per (unit, bin) using groupby
if len(df_events) > 0:
event_counts = (
df_events.groupby(["unit", "bin"]).size().reset_index(name="events")
)
else:
event_counts = pd.DataFrame(columns=["unit", "bin", "events"])
# Create full panel (all unit x bin combinations)
units = np.arange(n_units)
bin_ids = np.arange(bins)
full_panel = pd.DataFrame(
[(u, b) for u in units for b in bin_ids], columns=["unit", "bin"]
)
# Merge and fill zeros
panel = full_panel.merge(event_counts, on=["unit", "bin"], how="left")
panel["events"] = panel["events"].fillna(0).astype(int)
# Vectorized treatment calculation
panel["t_start"] = bin_edges[panel["bin"]]
panel["t_end"] = bin_edges[panel["bin"] + 1]
panel["exposure"] = panel["t_end"] - panel["t_start"]
treatment_times_arr = treatment_times[panel["unit"].values]
is_never_treated = np.isinf(treatment_times_arr)
overlap_start = np.maximum(panel["t_start"].values, treatment_times_arr)
overlap_end = panel["t_end"].values
overlap_length = np.maximum(0, overlap_end - overlap_start)
panel["treatment"] = np.where(
is_never_treated, 0, overlap_length / panel["exposure"]
)
return panel[["unit", "bin", "events", "exposure", "treatment"]]
def nonparametric_did(events, treatment_times, T_w, T):
"""
Nonparametric DiD estimator using 4-cell approach
Parameters:
-----------
events : list of tuples
(unit_id, event_time) tuples
treatment_times : array
Treatment times (np.inf for never-treated)
T_w : float
Treatment time cutoff for pre/post split
T : float
Total observation period
Returns:
--------
tau_hat : float
DiD estimate of treatment effect
se_tau : float
Standard error (delta method)
"""
df_events = pd.DataFrame(events, columns=["unit", "time", "treat"])
# Classify units
treated_units = [
i for i, t_treat in enumerate(treatment_times) if not np.isinf(t_treat)
]
control_units = [
i for i, t_treat in enumerate(treatment_times) if np.isinf(t_treat)
]
# Count events in each cell
control_pre_events = len(
df_events[(df_events["unit"].isin(control_units)) & (df_events["time"] < T_w)]
)
control_post_events = len(
df_events[(df_events["unit"].isin(control_units)) & (df_events["time"] >= T_w)]
)
treated_pre_events = len(
df_events[(df_events["unit"].isin(treated_units)) & (df_events["time"] < T_w)]
)
treated_post_events = len(
df_events[(df_events["unit"].isin(treated_units)) & (df_events["time"] >= T_w)]
)
# Compute exposure time
control_pre_exposure = len(control_units) * T_w
control_post_exposure = len(control_units) * (T - T_w)
treated_pre_exposure = len(treated_units) * T_w
treated_post_exposure = len(treated_units) * (T - T_w)
# Compute rates
lambda_control_pre = control_pre_events / control_pre_exposure
lambda_control_post = control_post_events / control_post_exposure
lambda_treated_pre = treated_pre_events / treated_pre_exposure
lambda_treated_post = treated_post_events / treated_post_exposure
# DiD estimator
tau_hat = (np.log(lambda_treated_post) - np.log(lambda_control_post)) - (
np.log(lambda_treated_pre) - np.log(lambda_control_pre)
)
# Standard error using delta method
se_tau = np.sqrt(
1 / treated_post_events
+ 1 / control_post_events
+ 1 / treated_pre_events
+ 1 / control_pre_events
)
return tau_hat, se_tau
def poisson_twfe(df_panel):
"""
Poisson 2WFE estimator using pyfixest
Assumes multiplicative parallel trends:
E[N_ik | W, α, γ] = exp(α_i + τW_ik + γ_k)
Parameters:
-----------
df_panel : DataFrame
Panel data with columns: unit, bin, events, treatment
Returns:
--------
tau_hat : float
Coefficient on treatment
se_tau : float
Standard error
"""
try:
result = pf.fepois(fml="events ~ treatment | unit + bin", data=df_panel)
tau_hat = result.coef()["treatment"]
se_tau = result.se()["treatment"]
return tau_hat, se_tau
except Exception as e:
return np.nan, np.nan
def ols_twfe(df_panel):
"""
OLS 2WFE estimator using pyfixest
Assumes additive parallel trends:
E[N_ik | W, α, γ] = α_i + τW_ik + γ_k
Should be biased when true DGP is multiplicative (Poisson)
Parameters:
-----------
df_panel : DataFrame
Panel data with columns: unit, bin, events, treatment
Returns:
--------
tau_hat : float
Coefficient on treatment
se_tau : float
Standard error
"""
try:
result = pf.feols(fml="events ~ treatment | unit + bin", data=df_panel)
tau_hat = result.coef()["treatment"]
se_tau = result.se()["treatment"]
return tau_hat, se_tau
except Exception as e:
return np.nan, np.nansimulate.py: Parallelized runner
simulation_results.csv# ABOUTME: Simulation runner for counting process DiD
# ABOUTME: Parallelized simulation that outputs results to simulation_results.csv
# %%
from dgp import (
assign_treatment,
simulate_counting_process,
prep_binned_df,
nonparametric_did,
poisson_twfe,
ols_twfe,
)
import numpy as np
import pandas as pd
import time
from joblib import Parallel, delayed
import warnings
warnings.filterwarnings("ignore")
# %%
def run_single_simulation(
sim_id,
n_units,
alpha_sigma,
gamma_fn,
tau_true,
beta_U_active,
T,
T_w,
frac_treated,
selection_mechanism,
debug=False,
):
"""
Run a single simulation replication
Parameters:
-----------
sim_id : int
Simulation ID (used for random seed)
n_units : int
Number of units
alpha_sigma : float
Std dev of unit fixed effects
gamma_fn : function
Time-varying baseline γ(t)
tau_true : float
True treatment effect
beta_U_active : float
Effect of U on intensity (0 except for unobserved_confounding)
T : float
Total observation period
T_w : float
Treatment time
frac_treated : float
Fraction treated (for random assignment)
selection_mechanism : str
Selection mechanism
Returns:
--------
results : list of dict
Results for each estimator
"""
results = []
# Generate unit characteristics for this simulation
np.random.seed(sim_id)
alpha = np.random.normal(0, alpha_sigma, n_units)
U = np.random.uniform(0, 1, n_units)
# Assign treatment
treatment_times = assign_treatment(
n_units,
alpha,
U,
T_w,
selection_mechanism=selection_mechanism,
frac_treated=frac_treated,
)
# Generate events
events = simulate_counting_process(
n_units, alpha, U, gamma_fn, tau_true, beta_U_active, treatment_times, T
)
if debug:
return events, treatment_times
# Estimator 1: Nonparametric DiD
try:
tau_did, se_did = nonparametric_did(events, treatment_times, T_w, T)
results.append(
{
"sim": sim_id,
"selection_mechanism": selection_mechanism,
"estimator": "Nonparametric DiD",
"tau_hat": tau_did,
"se": se_did,
"ci_lower": tau_did - 1.96 * se_did,
"ci_upper": tau_did + 1.96 * se_did,
}
)
except:
pass
# Estimator 2: Poisson 2WFE (fine bins)
try:
df_panel_fine = prep_binned_df(events, treatment_times, bins=int(T), T=T)
tau_pois_fine, se_pois_fine = poisson_twfe(df_panel_fine)
if not np.isnan(tau_pois_fine):
results.append(
{
"sim": sim_id,
"selection_mechanism": selection_mechanism,
"estimator": "Poisson 2WFE (fine bins)",
"tau_hat": tau_pois_fine,
"se": se_pois_fine,
"ci_lower": tau_pois_fine - 1.96 * se_pois_fine,
"ci_upper": tau_pois_fine + 1.96 * se_pois_fine,
}
)
except:
pass
# Estimator 3: Poisson 2WFE (coarse bins)
try:
df_panel_coarse = prep_binned_df(events, treatment_times, bins=3, T=T)
tau_pois_coarse, se_pois_coarse = poisson_twfe(df_panel_coarse)
if not np.isnan(tau_pois_coarse):
results.append(
{
"sim": sim_id,
"selection_mechanism": selection_mechanism,
"estimator": "Poisson 2WFE (coarse bins)",
"tau_hat": tau_pois_coarse,
"se": se_pois_coarse,
"ci_lower": tau_pois_coarse - 1.96 * se_pois_coarse,
"ci_upper": tau_pois_coarse + 1.96 * se_pois_coarse,
}
)
except:
pass
# Estimator 4: OLS 2WFE (fine bins) - should be biased under multiplicative DGP
try:
df_panel_fine = prep_binned_df(events, treatment_times, bins=int(T), T=T)
tau_ols_fine, se_ols_fine = ols_twfe(df_panel_fine)
if not np.isnan(tau_ols_fine):
results.append(
{
"sim": sim_id,
"selection_mechanism": selection_mechanism,
"estimator": "OLS 2WFE (fine bins)",
"tau_hat": tau_ols_fine,
"se": se_ols_fine,
"ci_lower": tau_ols_fine - 1.96 * se_ols_fine,
"ci_upper": tau_ols_fine + 1.96 * se_ols_fine,
}
)
except:
pass
# Estimator 5: OLS 2WFE (coarse bins)
try:
df_panel_coarse = prep_binned_df(events, treatment_times, bins=5, T=T)
tau_ols_coarse, se_ols_coarse = ols_twfe(df_panel_coarse)
if not np.isnan(tau_ols_coarse):
results.append(
{
"sim": sim_id,
"selection_mechanism": selection_mechanism,
"estimator": "OLS 2WFE (coarse bins)",
"tau_hat": tau_ols_coarse,
"se": se_ols_coarse,
"ci_lower": tau_ols_coarse - 1.96 * se_ols_coarse,
"ci_upper": tau_ols_coarse + 1.96 * se_ols_coarse,
}
)
except:
pass
return results
# %%
# run_single_simulation(
# sim_id=0,
# n_units=100,
# alpha_sigma=0.5,
# gamma_fn=lambda t: 0.3 * np.sin(2 * np.pi *
# t / 7),
# tau_true=0.5,
# beta_U_active=0.5,
# T=50,
# T_w=20,
# frac_treated=0.5,
# selection_mechanism="random",
# )
# %%
def run_simulation(
n_sims=500,
n_units=100,
T=50,
T_w=20,
tau_true=0.5,
gamma_fn=lambda t: 0.3 * np.sin(2 * np.pi * t / 7),
alpha_sigma=0.5,
frac_treated=0.5,
selection_mechanisms=["random", "selection_on_alpha", "unobserved_confounding"],
beta_U=0.5,
n_jobs=-1,
output_file="simulation_results.csv",
):
"""
Run full simulation study with parallel execution
Parameters:
-----------
n_sims : int
Number of simulation replications
n_units : int
Number of units
T : float
Total observation period
T_w : float
Treatment time
tau_true : float
True treatment effect
gamma_fn : function
Time-varying baseline γ(t)
alpha_sigma : float
Std dev of unit fixed effects
frac_treated : float
Fraction treated (for random assignment)
selection_mechanisms : list
List of selection mechanisms to test
beta_U : float
Effect of U on intensity (for unobserved_confounding)
n_jobs : int
Number of parallel jobs (-1 = use all cores)
output_file : str
Output CSV filename
Returns:
--------
results_df : DataFrame
Simulation results
"""
all_results = []
for selection_mechanism in selection_mechanisms:
print(f"\n{'=' * 60}")
print(f"Selection mechanism: {selection_mechanism}")
print(f"{'=' * 60}")
beta_U_active = (
beta_U if selection_mechanism == "unobserved_confounding" else 0.0
)
# Parallel execution
start_time = time.time()
results_list = Parallel(n_jobs=n_jobs, verbose=10)(
delayed(run_single_simulation)(
sim,
n_units,
alpha_sigma,
gamma_fn,
tau_true,
beta_U_active,
T,
T_w,
frac_treated,
selection_mechanism,
)
for sim in range(n_sims)
)
elapsed = time.time() - start_time
# Flatten results
for res_list in results_list:
all_results.extend(res_list)
print(f"Completed in {elapsed:.1f}s ({n_sims / elapsed:.1f} sims/sec)")
# Save results
results_df = pd.DataFrame(all_results)
results_df.to_csv(output_file, index=False)
print(f"\n{'=' * 60}")
print(f"✓ Simulation complete! Results saved to {output_file}")
print(f" Total rows: {len(results_df)}")
print(f"{'=' * 60}")
return results_df
if __name__ == "__main__":
print("Starting parallelized simulation study...")
print(f"Parameters: n_sims=500, n_units=100, T=50, tau=0.5, beta_U=0.5")
print(f"Using all available CPU cores")
results_df = run_simulation(
n_sims=500,
n_units=100,
T=50,
T_w=20,
tau_true=0.5,
beta_U=0.5,
n_jobs=-1,
output_file="out/simulation_results.csv",
)
# Quick summary
print("\nQuick summary by mechanism and estimator:")
summary = (
results_df.groupby(["selection_mechanism", "estimator"])["tau_hat"]
.agg(["mean", "std", "count"])
.round(3)
)
print(summary)