SDK Guide¶
nyc311 is usable as a functional SDK for scripts, scheduled jobs, interactive
analysis, and data-processing workflows.
This guide describes the current stable SDK surface in the 1.x line. For
causal-inference engines on panel data, see
factor-factory integration. For a before/after diff of
consumer code from v0.3, see migration-v0-to-v1.md.
ServiceRequestRecord carries both created_date and closed_date (added in
v1.0.1, nullable for unresolved complaints) so resolution-time analyses can
compute closed_date - created_date without bypassing the SDK. All loaders —
CSV, Socrata, bulk_fetch, and the dataframe helpers — preserve closed_date
end-to-end.
The current SDK is built around small, typed steps:
- load records
- extract deterministic topics
- aggregate by geography
- export an artifact if needed
The Functional Workflow¶
The most common SDK pattern is:
- fetch a live filtered slice into memory
- inspect or enrich it in Python
- export a local snapshot only if you want one
- run topic or resolution analysis against that in-memory data
from datetime import date
from pathlib import Path
from nyc311 import analysis, export, models, pipeline
records = pipeline.fetch_service_requests(
filters=models.ServiceRequestFilter(
start_date=date(2025, 1, 1),
end_date=date(2025, 1, 31),
geography=models.GeographyFilter("borough", models.BOROUGH_BROOKLYN),
complaint_types=("Noise - Residential",),
),
socrata_config=models.SocrataConfig(page_size=250, max_pages=1),
)
assignments = analysis.extract_topics(
records,
models.TopicQuery("Noise - Residential", top_n=10),
)
summary = analysis.aggregate_by_geography(
assignments,
geography="community_district",
)
export.export_topic_table(
summary,
models.ExportTarget("csv", Path("brooklyn-noise-topics.csv")),
)
One-Call Pipeline Helper¶
For workflow code that does not need to manage every intermediate step, use
nyc311.pipeline.run_topic_pipeline():
from pathlib import Path
from nyc311 import export, models, pipeline
records = pipeline.fetch_service_requests(
filters=models.ServiceRequestFilter(
geography=models.GeographyFilter("borough", models.BOROUGH_BROOKLYN),
complaint_types=("Noise - Residential",),
),
socrata_config=models.SocrataConfig(page_size=250, max_pages=1),
)
export.export_service_requests_csv(
records,
models.ExportTarget("csv", Path("brooklyn-noise-snapshot.csv")),
)
If you already have a local snapshot, run_topic_pipeline() remains the fastest
one-call path:
from nyc311.pipeline import run_topic_pipeline
summary = run_topic_pipeline(
"brooklyn-noise-snapshot.csv",
"Noise - Residential",
geography="community_district",
output=Path("topics.csv"),
)
Live Socrata Loading¶
The SDK already supports live loading through SocrataConfig.
from datetime import date
from nyc311 import models
from nyc311.pipeline import run_topic_pipeline
summary = run_topic_pipeline(
models.SocrataConfig(
app_token=None,
page_size=500,
max_pages=1,
),
"Rodent",
geography="borough",
filters=models.ServiceRequestFilter(
start_date=date(2025, 1, 1),
end_date=date(2025, 1, 31),
),
)
Stage A Local Snapshot First¶
For larger workflows, a good pattern is to fetch once and cache the result as a local CSV snapshot:
from pathlib import Path
from nyc311 import export, io, models
records = io.load_service_requests(
models.SocrataConfig(page_size=500, max_pages=2),
filters=models.ServiceRequestFilter(
complaint_types=("Noise - Residential",),
),
)
export.export_service_requests_csv(
records,
models.ExportTarget("csv", Path("noise-snapshot.csv")),
)
That keeps notebook iteration reproducible and avoids repeated live API fetches.
Optional DataFrame helpers¶
The pandas-backed helpers live behind an optional extra so the base package can stay lightweight:
pip install "nyc311[dataframes]"
For the full turnkey stack:
pip install "nyc311[all]"
Or install the broader notebook stack without geospatial dependencies:
pip install "nyc311[science]"
Once installed, the SDK exposes helpers such as
nyc311.dataframes.records_to_dataframe(),
nyc311.dataframes.assignments_to_dataframe(),
nyc311.dataframes.summaries_to_dataframe(),
nyc311.dataframes.gaps_to_dataframe(),
nyc311.dataframes.anomalies_to_dataframe(),
nyc311.dataframes.coverage_to_dataframe(), and
nyc311.dataframes.dataframe_to_records().
Borough Constants¶
The public SDK includes canonical borough constants and normalization helpers:
from nyc311.models import BOROUGH_BROOKLYN, SUPPORTED_BOROUGHS, normalize_borough_name
BOROUGH_BROOKLYN
SUPPORTED_BOROUGHS
normalize_borough_name("bk")
Boundary-Backed GeoJSON¶
from pathlib import Path
from nyc311.pipeline import run_topic_pipeline
summary = run_topic_pipeline(
"brooklyn-noise-snapshot.csv",
"Noise - Residential",
geography="community_district",
output_format="geojson",
boundaries="community_district_boundaries.geojson",
output=Path("topics.geojson"),
)
Boundary files must currently include:
properties.geographyproperties.geography_value
Bulk Per-Borough Downloads¶
For multi-year, full-city extracts, nyc311.pipeline.bulk_fetch() splits a
single logical query into one CSV per borough. Each completed CSV is paired with
a .meta.json sidecar capturing the row count, SHA-256 checksum, fetch
timestamp, and the filter parameters used. Subsequent calls skip any borough
whose file already exists, so you can resume an interrupted download.
from datetime import date
from pathlib import Path
from nyc311.pipeline import bulk_fetch
paths = bulk_fetch(
complaint_types=("Noise - Residential", "Rodent"),
start_date=date(2023, 1, 1),
end_date=date(2024, 12, 31),
cache_dir=Path("data/cache"),
on_progress=lambda boro, page, rows: print(f"{boro}: page {page} ({rows} rows)"),
)
for csv_path in paths:
print(csv_path, csv_path.with_suffix(".meta.json"))
Factor Pipelines¶
nyc311.factors provides a composable, immutable pipeline for computing
domain-specific metrics over geographic units. Each Factor consumes a
FactorContext (one geographic unit, one time window, the complaints inside it,
and optional population/extras) and returns a single value. A Pipeline runs
many factors over many contexts in a single pass and produces a columnar
PipelineResult.
from datetime import date
from nyc311 import io, models
from nyc311.factors import (
ComplaintVolumeFactor,
FactorContext,
Pipeline,
ResponseRateFactor,
TopicConcentrationFactor,
)
records = io.load_service_requests(
"data/cache/brooklyn-2024.csv",
filters=models.ServiceRequestFilter(
start_date=date(2024, 1, 1),
end_date=date(2024, 12, 31),
),
)
# Group records by community district into FactorContexts.
by_cd: dict[str, list[models.ServiceRequestRecord]] = {}
for rec in records:
by_cd.setdefault(rec.community_district, []).append(rec)
contexts = [
FactorContext(
geography="community_district",
geography_value=cd,
complaints=tuple(complaints),
time_window_start=date(2024, 1, 1),
time_window_end=date(2024, 12, 31),
)
for cd, complaints in by_cd.items()
]
pipeline = (
Pipeline()
.add(ComplaintVolumeFactor())
.add(ResponseRateFactor())
.add(TopicConcentrationFactor())
)
result = pipeline.run(contexts)
df = result.to_dataframe() # requires nyc311[dataframes]
print(df.sort_values("complaint_volume", ascending=False).head())
Pipeline.add() returns a new pipeline rather than mutating in place, so
pipelines are safe to compose and share between callers.
Temporal Panels¶
nyc311.temporal builds balanced (unit, period) panels from raw
ServiceRequestRecord lists. Treatment events code policy interventions as
per-unit treatment indicators, and inverse-distance spatial weights feed
spatial-econometric workflows downstream.
from datetime import date
from nyc311 import io
from nyc311.temporal import TreatmentEvent, build_complaint_panel
records = io.load_service_requests("data/cache/brooklyn-2023.csv")
panel = build_complaint_panel(
records,
geography="community_district",
freq="ME", # monthly
treatment_events=(
TreatmentEvent(
name="rat_mitigation_zone_2023",
description="DOHMH rat mitigation zone designation",
treated_units=("BK01", "BK02", "BK03"),
treatment_date=date(2023, 7, 1),
geography="community_district",
),
),
)
treated = panel.treatment_group()
controls = panel.control_group()
df = panel.to_dataframe() # MultiIndex (unit_id, period)
Interop with factor-factory¶
Starting in v1.0.0, PanelDataset can hand its data off to
factor-factory
causal-inference engines directly:
ff_panel = panel.to_factor_factory_panel()
from factor_factory.engines.did import estimate as did_estimate
results = did_estimate(ff_panel, methods=("twfe",), outcome="complaint_count")
print(results[0].att, results[0].ci_95)
See integration.md for the full crosswalk between
PanelDataset and factor_factory.tidy.Panel, and for the list of
factor-factory engine families accessible via
Pipeline.as_factor_factory_estimate.
For spatial weights:
from nyc311.geographies import load_nyc_boundaries
from nyc311.temporal import build_distance_weights, centroids_from_boundaries
boundaries = load_nyc_boundaries("community_district")
centroids = centroids_from_boundaries(boundaries)
weights = build_distance_weights(centroids, threshold_meters=2000.0)
Upstream shapely-backed centroids (nyc-geo-toolkit v0.4+)
nyc311's centroids_from_boundaries returns the shapely-free
approximation (mean of exterior-ring points) as a
dict[str, (lat, lon)] so it feeds directly into
build_distance_weights. This is the lean default.
For publication-grade geometry — correct centroids, optional
representative_point for concave shorelines — use upstream's
shapely-backed helper (pulled in via
pip install "nyc-geo-toolkit[spatial]"):
from nyc_geo_toolkit import centroids_from_boundaries as ngt_centroids
centroid_collection = ngt_centroids(boundaries, representative=True)
centroids = {
f.geography_value: (
f.geometry["coordinates"][1], # lat
f.geometry["coordinates"][0], # lon
)
for f in centroid_collection.features
}
weights = build_distance_weights(centroids, threshold_meters=2000.0)
The two helpers return different shapes and different numbers — don't swap them mid-analysis.
Statistical Modeling¶
nyc311.stats is a thin, typed layer over statsmodels, ruptures,
linearmodels, and esda / libpysal. Every routine is opt-in via the stats
extra and degrades cleanly with an ImportError when its dependency is missing.
from datetime import date
from nyc311.stats import (
detect_changepoints,
interrupted_time_series,
seasonal_decompose,
)
# A pandas Series of monthly complaint counts indexed by month.
series = panel.to_dataframe()["complaint_count"].groupby(level="period").sum()
series.index = series.index.to_timestamp()
decomposition = seasonal_decompose(series, period=12)
breaks = detect_changepoints(series, method="pelt")
its = interrupted_time_series(series, intervention_date=date(2023, 7, 1))
print(its.level_change, its.p_value_level)
For panel regressions:
from nyc311.stats import panel_fixed_effects
result = panel_fixed_effects(
panel,
outcome="complaint_count",
regressors=("resolution_rate",),
time_effects=True,
cluster="entity",
)
print(result.coefficients, result.r_squared)
For spatial autocorrelation:
from nyc311.stats import global_morans_i, local_morans_i
values = {row.Index[0]: row.complaint_count for row in df.itertuples()}
moran = global_morans_i(values, weights)
lisa = local_morans_i(values, weights, permutations=999)
Causal Inference¶
from nyc311.stats import (
synthetic_control,
staggered_did,
event_study,
regression_discontinuity,
)
# Synthetic control — counterfactual from donor units
result = synthetic_control(panel, treated_unit="BROOKLYN 03", outcome="complaint_count")
print(result.att, result.donor_weights)
# Staggered difference-in-differences (Callaway–Sant'Anna 2021)
did = staggered_did(panel, outcome="complaint_count")
print(did.aggregated_att, did.aggregated_p_value)
# Event-study plot with pre-trend test
es = event_study(panel, outcome="complaint_count", pre_periods=5, post_periods=5)
print(es.coefficients, es.pre_trend_p_value)
# Sharp regression discontinuity
rd = regression_discontinuity(running_var, outcome, cutoff=0.0)
print(rd.treatment_effect, rd.p_value)
Spatial Econometrics¶
from nyc311.stats import (
spatial_lag_model,
spatial_error_model,
geographically_weighted_regression,
)
# Spatial lag model (Anselin 1988)
slm = spatial_lag_model(panel, weights, "complaint_count", ("income", "density"))
print(slm.rho, slm.coefficients)
# Spatial error model
sem = spatial_error_model(panel, weights, "complaint_count", ("income", "density"))
print(sem.lam, sem.coefficients)
# Geographically weighted regression (Brunsdon et al. 1996)
gwr = geographically_weighted_regression(values, regressors, coordinates)
print(gwr.local_coefficients, gwr.bandwidth)
Equity & Bias Analysis¶
from nyc311.stats import (
oaxaca_blinder_decomposition,
theil_index,
reporting_rate_adjustment,
latent_reporting_bias_em,
)
# Oaxaca-Blinder decomposition — explain resolution-time gaps
ob = oaxaca_blinder_decomposition(
group_a_df, group_b_df, "resolution_days", ("income", "density")
)
print(ob.explained, ob.unexplained, ob.total_gap)
# Theil index — population-weighted inequality
ti = theil_index(rates, populations, groups=borough_map)
print(ti.total, ti.between_group, ti.within_group)
# Ecometric reporting-rate adjustment (O'Brien 2015)
adj = reporting_rate_adjustment(
panel, "complaint_rate", ("median_income", "pop_density")
)
print(adj.adjusted_rates, adj.icc)
# Latent reporting-bias EM (Agostini et al. 2025)
em = latent_reporting_bias_em(counts, populations, covariates=covs)
print(em.estimated_true_rates, em.reporting_probabilities)
Anomaly Detection & Power Analysis¶
from nyc311.stats import detect_stl_anomalies, minimum_detectable_effect
# STL-residual anomaly detection
anomalies = detect_stl_anomalies(monthly_series, threshold=2.0)
print(anomalies.anomaly_dates, anomalies.n_anomalies)
# Power analysis for panel experiments
power = minimum_detectable_effect(n_units=59, n_periods=24, icc=0.05)
print(f"MDE: {power.mde:.3f} at 80% power")
Bayesian Small-Area Smoothing¶
from nyc311.stats import bym2_smooth
# BYM2 model (Riebler et al. 2016) — requires nyc311[bayes]
result = bym2_smooth(observed_counts, expected_counts, adjacency)
print(result.smoothed_rates, result.mixing_parameter)
Self-Exciting Point Processes¶
from nyc311.stats import fit_hawkes_process
# Hawkes process for complaint clustering (Mohler 2011)
hawkes = fit_hawkes_process(event_timestamps)
print(hawkes.background_rate, hawkes.branching_ratio)
Install the optional stats extra first:
pip install "nyc311[stats]"
pip install "nyc311[bayes]" # BYM2 small-area smoothing (PyMC)
Public Surface¶
Canonical namespaces¶
nyc311.modelsnyc311.ionyc311.analysisnyc311.geographiesnyc311.samplesnyc311.exportnyc311.pipelinenyc311.dataframesnyc311.spatialnyc311.plottingnyc311.presetsnyc311.factorsnyc311.temporalnyc311.stats— time series, panel regression, spatial autocorrelation, causal inference, spatial econometrics, equity/bias analysis, anomaly detection, power analysis, Bayesian smoothing, and point processes
When To Use The CLI Instead¶
Use the CLI when you want:
- a repeatable command in CI or a shell script
- a simple local CSV to artifact workflow
- no custom Python composition
Use the SDK when you want:
- Socrata ingestion
- interactive analysis or workflow orchestration
- custom filtering, branching, or intermediate inspection
- direct access to typed objects instead of files alone