Ask HN: Why don't we have a functional DSL for data+embedding+API pipelines?
I’ve been working on a pretty common problem:
   - I have structured data in JSONL files (in.jsonl, out.jsonl)
   - I match lines by a key
   - I transform them into (text, embedding) pairs
   - I optionally filter/map them
   - I batch them (into chunks of 50)
   - I push each batch into an external system (e.g. vector DB, Chroma)
Here’s what it usually looks like in Python:
```
   with open("in.json", "r") as fin:
      with open("out.json", "r") as fout:
        for in_line, out_line in zip(fin, fout):
            in_data = json.loads(in_line)
            out_data = json.loads(out_line)
            if in_data["custom_id"] != out_data["custom_id"]:
                raise Exception...
            texts = in_data["body"]["input"]
            embeddings = [d["embedding"] for d in out_data["response"]["body"]["data"]]
            for i in range(len(texts)):
                doc = texts[i]
                emb = embeddings[i]
                metadata = {
                    "source": f"chunk-{global_ids}",
We’re in 2025, and this is how we’re wiring data into APIs.
---
Why do we tolerate this?
This is a declarative, streaming, data processing problem. Why aren’t we using something more elegant? Something more composable, like functional pipelines?
I'm asking myself: Why don’t we have a composable, streaming, functional DSL for this kind of task?
---
Why not build it like Unix pipes?
What I want is something that feels like:
   cat input.jsonl \
   | match output.jsonl on custom_id \
   | extract (text, embedding) \
   | filter not-empty \
   | batch 50 \
   | send-to-chroma
In Lisp / Clojure:
   (->> (zip input output)
        (filter (= :custom_id))
        (mapcat (fn [[in out]] (zip (:input in) (:embedding out))))
        (partition-all 50)
        (map send-to-chroma))
In Elixir + Broadway:
   Broadway
   |> read_stream("in.jsonl", "out.jsonl")
   |> match_on(:custom_id)
   |> map(&{&1.text, &1.embedding})
   |> batch_every(50)
   |> send_to_chroma()
And now, back to Python..
We’re stuck writing imperative soup or building hacky DSLs with things like:
   load_json_pairs() \
   | where(is_valid) \
   | select(to_embedding_record) \
   | batch(50) \
   | foreach(send_to_chroma)
And even though libraries like tf.data.Dataset, dask.bag, pandas, or pipe exist, none of them really solve this use case in a cohesive and expressive way. They all focus on either tabular data, or big data, or ML input pipelines – not this "structured data -> transform -> push to API" pattern.
---
This is especially absurd now that everyone’s doing RAG
With Retrieval-Augmented Generation (RAG) becoming the norm, we’re all parsing files, extracting embeddings, enriching metadata, batching, and inserting into vector stores.
Why are we all writing the same low-level, ad-hoc code to do this?
Shouldn’t this entire category of work be addressed by proper DSL/framework?
Wouldn’t it make sense to build... - a functional DSL for JSON-to-embedding-to-API pipelines? - or a Python library with proper map, filter, batch, pipe, sink semantics? - or even a streaming runtime like Elixir Broadway or a minimal functional Rx-style graph?
Even R with dplyr has more elegant ways to express transformation than what we do in Python for these jobs.
---
Am I missing something?
Is there a tool, a language, or a framework out there that actually solves this well?
Or is this just one of those gaps in the tooling ecosystem that no one has filled yet?
Would love to hear what others are doing – and if anyone’s already working on a solution like this.
Thanks.
As some others already posted: There is more to copying data than just moving it. It is about observability. A lot of companies have created their own frameworks.
One example I find useful for a lot of usecases is dagster. You can define resources to encapsulate complexity https://docs.dagster.io/guides/build/external-resources in fact with components build on top of custom DSLs https://docs.dagster.io/guides/labs/components
At Magenta/Telekom we are bulding on https://georgheiler.com/event/magenta-data-architecture-25/ - you can follow along with this template here https://github.com/l-mds/local-data-stack/ you may find these examples useful to understand how to use dagster/graph-based data pipeliens @scale
Dagster is definitely one of the more polished orchestrators out there, and I like how it embraces the idea of resources and custom DSL-style components. It’s also cool that you’re using it at Magenta/Telekom at scale.
The only caution I have is that many orchestrators (Dagster, Airflow,...) are typically task-level or graph-level frameworks: they do a great job of linking up big stages of a pipeline, but I’ve found that you still end up writing quite a bit of manual or ad-hoc code. That’s not necessarily a knock on Dagster—it’s just the reality that orchestrators focus on coordinating tasks, rather than giving you a row-by-row DSL with robust side-effect semantics.
Still, those links you shared look super useful for seeing how Dagster can be extended with domain-specific abstractions. Appreciate you pointing those out — I’ll check them out
Many companies wrote their in-house framework for ETL. For example I have worked in two companies that both chose to write sort of YAML based ingestion framework so that users only need to write YAML.
I guess you can make the case that a general framework is useful, but TBH I think either it takes too long to include all use cases (and each company probably only needs one), or becomes too complicated to be a framework.
Not to discourage you though, it's fun to build your own project and I'd love you to try it out. Worst case, no one uses it but you bag a ton of experience.
I get why so many teams roll their yaml based system, as it feels more straightforward to build something tailored to your needs than trying to shoehorn the generalities in. But that’s exactly the tricky part: the moment you try to accommodate every possible use case, the framework either blows up in scope or ends up too niche to be widely adopted.
There's a lot you can do with jq to transform complex json into an intermediate format designed to be input for simpler Python processing.
My experience with Dagster has made me appreciative of plain, simple Python code and highly skeptical of frameworks.
The longer you stick with Python the more elegant you'll find your code. It takes time and trial and error to go from being annoyed with your code to being more comfortable with it and not looking for a tool or framework to be the silver bullet.
Thanks for sharing that perspective. I do think jq is great for slicing up JSON before handing it off to Python—especially if your transformations are primarily stateless and can be expressed as simple map/filters. And yes, a lot of frameworks can turn into "magic black boxes" that complicate what should be simple.
That said, my gripe is that when you move beyond "transform this data" and step into "transform and then push to some API, handle partial success, retries, etc.," the line-by-line or chunk-by-chunk side-effects logic in Python can get gnarly fast. That’s the part I wish there was a standard, declarative approach for—something that doesn’t require a big workflow orchestrator (like Dagster) but also doesn’t devolve into tons of imperative glue code.
I agree no tool is a silver bullet. In many cases, plain, well-structured Python is enough. But once you need concurrency, chunk-based error handling, or incremental streaming with side effects, you end up coding your own partial solution anyway—and that’s where I start wishing for a more composable pattern in the standard Python ecosystem. Until then, jq + Python is definitely a solid approach for the simpler jobs!
I def agree that there is a pattern to most data pipelines:
- read from an input (source)
- perform some sort of processing
- write the data to some output (sink)
This may either be batch or continuous (stream). The inputs may change, the outputs may change.
I personally think that sql and duckdb are well positioned to do this. SQL is declarative, standardized and has decades worth of mature implementations.
The “source” can be modeled as a table.
The “sink” can also be modeled as a table.
What does a custom dsl provide over sql?
I have a side project called Sqlflow which is attempting to do something similar/
https://github.com/turbolytics/sql-flow
It’s not a DSL but the pipeline is standardized using the source, process, sink stages. Right now the process is pure sql but the source and sink are declarative. SQL has so much prior art, linters and a huge ecosystem with many practitioners.
The sticking point for me, though, is side effects. Once you need to call an external API—maybe to insert vector embeddings, send records to a SaaS service, or update some non-SQL store—you lose the comfortable ACID guarantees and pure SQL elegance. Even if you stage data in a DuckDB table, you still have to process each row or batch with an imperative approach. That’s where I start feeling the friction. SQL is brilliant for purely data-driven transformations; it doesn’t inherently solve "call this remote side-effect function in small batches, handle partial failures, and keep the pipeline consistent.
Can we unify those worlds? If your project, Sqlflow, manages to let folks stay mostly in SQL—while also elegantly handling side effects—that might be a huge step forward. For strictly data-focused workflows, I’m 100% on board that SQL alone is often the best "DSL" around. The complexity creeps in when we go from "write results to a table" to "call an external system" (possibly with partial commits, retries, or streaming needs). That’s usually where we end up rolling bespoke logic. If Sqlflow can bridge that gap, it’d be awesome. I’ll check it out—thanks for sharing.
Based on the averages, a DSL is going to suck and be a curse upon your future self/maintainers.
You provided a Clojure alternative, but without using the precise feature designed to alleviate this problem: transducers[0]. Transducers are the combination of functional composition and an indirection designed to decouple the data transformation pipeline from any specific source (e.g. async vs in-memory doesn't matter).
[0] https://clojure.org/reference/transducers
Yeah, I agree. DSLs tend to become a maintenance burden over time! transducers let you compose data transformations independently of the data source (in-memory, async, etc.), which sidesteps a lot of the pitfalls that a full DSL might introduce. Thanks so much for highlighting this!
my thoughts, exactly, for around 10 years, since my first serious data engineering projects. So many projects I worked on, all suffering this problem: complex individual data pipelines.
Sometimes I wrote small frameworks on my own, sometimes I used 3rd party tools. I also tried to create a universal framework for that, trying to solve exactly what you described. Never finished it, because I am more of the no-finishing guy.
However, at some point I came to the conclusion, that it's simply not possible. which is nonsense and more of an excuse. as you said, it's just data in and data out, processed through some simple rules.
so, I'd know the concept, but there are a million other projects in my head.
I know exactly what you mean—it’s the same cycle I’ve seen in countless projects, with every project spiraling into a brittle pipeline that’s hard to maintain. Despite all the promising ideas, we never seem to nail down a universal solution that’s both simple and robust. - There must be a way, and sooner or later it will be figured out.
Write a macro DSL in Clojure for it then throw it up as a library.
To get to a higher level of abstraction than the general purpose languages you have to get more specific, maybe someone just hasn’t done it yet for your use case.
Can you clarify what angle of the existing solution you don’t like? The example you give of the Unix pipeline is pretty similar to what we do with pandas pipeline chains.
(this is just thrown together, untested, on mobile, and I’m not sure what your data looks like exactly, or how you’re transforming it, but that’s the general form)Is it the syntax you don’t like, or your data is too large to fit in memory, or what aspect? This can certainly be made more concise if desired, if we want to wrap things in classes, decorators, context managers, overload the actual pipe operator, or other syntactic sugar mechanisms.
Most tools you find will be aimed at handling scale, and in the name of efficiency will therefore be in a columnar form, which is why you see a bunch of tools that do this already but only on tabular data. JSON is slow to work with beyond the initial load, so often you’d see it loaded into pandas or polars, maybe with minor transformation, then dumped to parquet files for some downstream tool to pick up and process.
These systems are also geared toward being robust. In a real world system there are all kinds of unexpected errors to handle. What happens if the API you’re pushing to is down, or starts rate limiting you, or someone added a new field or renamed a field or your two data sets don’t have matching keys? Real pipelines have to account for this, and depending on your scenario you might want to retry, or roll back changes, and this can get ridiculously complicated because in API land you often don’t have any analogous concept of transactions and rollback like exist in SQL, so without a distributed transaction coordinator (it’s own complexity), streaming data will lead to distributed systems drifting out of sync. And now you need a batch job on top of your streaming data to ensure it hasn’t fallen out of sync (so in reality, unless you have heavy complexity needs, just use SQL so you have transactions, or just use batch jobs).
But that’s the reason you don’t see this record-based fire-and-forget approach, because it’s too slow and lacks error handling, and why instead everything is columnar and heavyweight (with DAGs and workflow orchestrators etc).
Maybe I’m missing your point, but your Python code could definitely be as concise as any of the other examples you gave.
Thanks for laying that out. You raise great points: Pandas (or Polars) chaining is indeed closer to what we want than raw imperative loops, and it handles a lot of the transformations. But it still assumes an in-memory, columnar approach, which doesn’t always gel with row-by-row side effects like pushing data to an external API. Also, real-world systems do require robust error handling and partial retries, which is exactly the scenario we’d love a functional pipeline to address by design, rather than patching in big try/except blocks. Tools like DAG orchestrators solve that at a workflow level, but not so much at the record/stream level.
So that’s the core frustration: yes, we can hack it in Python with a pipeline style, yes we can load data into DataFrames and transform it, but as soon as we do side-effectful calls (like collection.add(...) per record or batch), we have to drop out of the nice .pipe() flow. For me, it’s less about syntax and more about a missing standard approach to composable streams + side effects. Elixir Broadway, or even a DSL in Lisp/Clojure, addresses that more natively.
“Just use batch jobs / just use SQL transactions” works only if your external system supports transactions or if you keep your data in a single database. But many people use specialized systems (e.g. vector DBs, text embeddings) that don’t have an ACID-like model. Tools like Elixir Broadway do exactly that for streaming: concurrency, batching, backpressure, built-in retry, etc. That’s part of the reason we’re exploring new paradigms, rather than simply reading into Pandas or writing a standard “for line in file” script.
In short, I’m not saying Pandas can’t be coerced into something close to a pipeline. It’s more that we lack a standard, built-in, composable “stream of records -> transform -> side-effect-> handle errors” solution. Once you leave the purely tabular, in-memory paradigm, or need partial success handling, you quickly outgrow the typical DataFrame approach. That’s why I find frameworks like Elixir Broadway, or even an RxJS-like pipeline, so appealing — they make chunk-based side effects a first-class citizen.
So the question is: do we keep building one-off solutions in Python, or do we push for a more standardized, composable approach that can handle both transformations and side effects gracefully?
Have you had a look at snakemake or nextflow?
No, I haven’t looked into Snakemake or Nextflow yet, but they seem intriguing -amazing, thanks for the suggestion!
as you’ve listed, there are answers within reach but switching costs are enormous and the labor market is locked into a set of known technologies that generate salaries, a low energy state that would require substantial energy investment in reskilling, but there’s no slack energy in the system for that and even if there was, the transition will have winners and losers, redistribute jobs, and if there’s one thing the labor mkt is terrified of it’s losing one’s job. disruptive technology shifts can be career ending for a lot of people … bottom line is that getting paid to shovel python slop has been historically lucrative
I think you’re right to point out the inertia in the labor market and the structural energy required for paradigm shifts.
What worries me most, though, is that because of exactly the reasons you’ve described — and now, with the rise of GenAI that makes generating boilerplate imperative code even easier — we might be permanently missing the opportunity to invent a better, more elegant solution.
Instead of challenging the underlying paradigm, we’re accelerating its replication — just with fewer keystrokes. GenAI is helping us pave cow paths, not design highways.
My fear is that the convenience of generating "good enough" code at scale will solidify suboptimal workflows, and the space for reimagining them might quietly close forever.
life finds a way
someone really should have asked dr malcom to cite his sources
[flagged]