codeflash-agent/evals/templates/memory-hard/tests/test_pipeline.py
Kevin Turcios 33faedf427
Add Unstructured report, rewrite statusline, format evals/scripts (#20)
* Add Unstructured engagement report as uv workspace member

Three-tier Plotly Dash app (Executive Brief, Engineering Team, Full
Detail) with data in JSON, theme constants in theme.py, and Dash
production improvements (Google Fonts, clientside callbacks, meta tags).

Also: add .playwright-mcp/ to .gitignore, add reports/* ruff overrides,
remove tracked .codeflash/observability/read-tracker.

* Rewrite statusline to derive context from git state

Detects active area from changed files (reports, packages, plugin,
.codeflash, case-studies, evals), falls back to branch name convention
(perf/*, feat/*, fix/*), shows dirty indicator. Uses whoami for
cross-platform user detection.

* Add pre-push lint rule to commit guidelines

* Exclude .codeflash/ from ruff linting

Benchmark and profiling scripts in .codeflash/ are scratch work, not
package source. Excluding them prevents CI failures from ad-hoc scripts.

* Run ruff format across packages, scripts, evals, and plugin refs

* Fix github-app async test failures in CI

Add asyncio_mode = "auto" to root pytest config so async tests
are detected when running from the repo root via uv run pytest packages/.
2026-04-15 03:06:16 -05:00

76 lines
2.6 KiB
Python

from pipeline.core import process_readings
def test_basic():
raw = [
{
"id": 1,
"sensor_type": "temp",
"timestamp": "2024-01-01T00:00:00",
"value": 22.5,
"metadata": {"location": {"lat": 0, "lng": 0}},
},
{
"id": 2,
"sensor_type": "temp",
"timestamp": "2024-01-01T01:00:00",
"value": 23.0,
"metadata": {"location": {"lat": 0, "lng": 0}},
},
{
"id": 3,
"sensor_type": "humidity",
"timestamp": "2024-01-01T00:00:00",
"value": 45.0,
"metadata": {"location": {"lat": 0, "lng": 0}},
},
]
result = process_readings(raw)
assert len(result) == 2
assert result["temp"]["count"] == 2
assert result["humidity"]["count"] == 1
def test_large_dataset():
"""Production-scale dataset — process_readings uses too much memory.
With 50k readings, peak memory is far higher than the input data size.
The goal is to reduce memory usage while preserving correctness.
"""
sensor_types = [f"sensor-{i}" for i in range(50)]
raw = []
for i in range(50_000):
raw.append(
{
"id": i,
"sensor_type": sensor_types[i % len(sensor_types)],
"timestamp": f"2024-{(i % 12) + 1:02d}-{(i % 28) + 1:02d}T{(i % 24):02d}:00:00",
"value": round(20.0 + (i % 100) * 0.1, 2),
"unit": "celsius",
"metadata": {
"location": {
"lat": round(37.0 + (i % 10) * 0.1, 4),
"lng": round(-122.0 + (i % 10) * 0.1, 4),
"altitude": float(i % 50),
},
"firmware": f"v{1 + i % 3}.{i % 10}.0",
"calibration_date": "2024-01-15",
"sensor_config": {
"sample_rate": 100,
"precision": "high",
"filter": "kalman",
},
"deployment": {
"site": f"site-{i % 10}",
"rack": f"rack-{chr(65 + i % 8)}",
"position": i % 20,
},
"tags": [f"tag-{i % 5}", f"tag-{(i + 1) % 5}"],
"history": [f"event-{j}" for j in range(3)],
},
}
)
result = process_readings(raw)
assert len(result) == 50
assert all(r["count"] == 1000 for r in result.values())
assert all(r["mean"] > 0 for r in result.values())