Background jobs & workflows, code-first
Klassd.Workflows is a code-first, NuGet-distributed background-job and workflow engine for .NET. Jobs are plain C# classes; the scheduler runs each one as its own Kubernetes pod in production and as a local process in dev — the same worker either way. Compose jobs into DAG workflows and watch them run live.
What you get
Code-first jobs
A job is a C# class implementing IJob — no attributes-as-config, no YAML. The engine discovers it by type name.
Runs in its own pod
Each execution is a batch/v1 Kubernetes Job (one pod), with per-job CPU/memory requests and limits resolved from attribute + config.
Same worker, local too
In dev the same worker runs as a child process — no cluster needed. Switch to Kubernetes with one config setting.
DAG workflows
Compose jobs into a graph: dependencies, fan-out (one pod per item), conditional nodes, retries, and artifact passing between nodes.
Live dashboard
A Blazor Server UI shows the job list, per-job console + progress, recurring jobs, and an SVG view of each DAG run.
Durable & pluggable
Swap the job store (in-memory / PostgreSQL / MongoDB) and the artifact store (filesystem / S3 / GCS) — or ship your own adapter.
Quickstart
Install the core plus the adapters you need. While Klassd.Workflows is in beta the packages are prerelease:
dotnet add package Klassd.Workflows.Core --prerelease
dotnet add package Klassd.Workflows.Storage.Postgres --prerelease # durable store (or .Storage.MongoDb)
dotnet add package Klassd.Workflows.Kubernetes --prerelease # K8s executor (omit for local only)
dotnet add package Klassd.Workflows.Artifacts.S3 --prerelease # artifact store (or .Artifacts.Gcs)1. Define a job
Any class implementing IJob is a unit of work. Use the IJobContext to log, report progress, and read arguments:
public sealed class MyJob : IJob
{
public async Task RunAsync(IJobContext ctx)
{
ctx.Log("starting");
ctx.ReportProgress(50, "halfway");
await Task.Delay(1000, ctx.CancellationToken);
ctx.Log("done");
}
}2. Wire it up in Program.cs
AddKlassdWorkflowsCore() returns a builder you use to pick a durable store; pick an executor separately:
var workflows = builder.Services.AddKlassdWorkflowsCore();
workflows.UsePostgres("Host=…;Database=…;Username=…;Password=…"); // or .UseMongo(...) / in-memory
builder.Services.AddKubernetesExecutor(builder.Configuration); // or AddLocalExecutor(workerDll)3. Run
Enqueue jobs from code, or open the dashboard to start/stop them and watch live console output.
Scheduling
Fire a job now, or register a recurring job with a cron expression (parsed by Cronos):
scheduler.AddOrUpdateRecurring<MyJob>("nightly", "0 2 * * *"); // cron
await scheduler.EnqueueAsync<MyJob>(); // fire now Recurring workflows are registered the same way with AddOrUpdateRecurringWorkflow(id, name, cron). Pod resources (CPU/memory requests & limits) are set per job with a [JobResources] attribute and can be retuned from config without a recompile.
Workflows (DAGs)
Jobs compose into a directed acyclic graph that fans out, waits on dependencies and passes data between nodes. The orchestrator runs in the scheduler; each node runs as a normal worker pod, so every node has its own live console.
registry.Register(new WorkflowBuilder("catalog-integration")
.Add<MarketFinderJob>("markets") // root: emits "market_ids"
.Add<DataProxyJob>("data-proxy") // parallel root: writes an artifact
.Add<IntegrationJob>("integration", n => n
.DependsOn("markets", "data-proxy")
.FanOutOver("markets", "market_ids", itemArgument: "market")) // one pod per market
.Add<PublishJob>("publish", n => n.DependsOn("integration").WithRetries(2))
.Add<FinalizerJob>("finalizer", n => n
.DependsOn("publish", "data-proxy")
.BindInput("dataset_ref", "data-proxy", "dataset_ref")) // reads the artifact
.Build());- Dependencies — a node starts once all its dependencies are satisfied; a failed dependency skips dependents.
- Fan-out — read an upstream output as a JSON array and start one execution per element.
- Conditions — run a node only when a predicate over upstream outputs holds (otherwise it's benignly omitted).
- Retries — re-run a failed execution up to n times, per fan-out item.
- Artifacts — large payloads pass through an
IArtifactStore; a node saves an artifact and publishes the small reference downstream.
Executors: local & Kubernetes
The same worker runs locally and in the cluster — only the executor that launches it differs. Communication is a line protocol on stdout, so Kubernetes pod logs are the transport for free.
- Local —
AddLocalExecutor(...)launches the worker as a child process per job. No cluster required; ideal for dev. - Kubernetes —
AddKubernetesExecutor(...)creates abatch/v1Job (one pod,restartPolicy: Never) per execution, tails its logs, and cleans up viattlSecondsAfterFinished. Stopping a job deletes the Job; SIGTERM cancels the worker's token.
Storage & artifacts
Two pluggable seams, both with built-in adapters and open for your own:
- Job store (
IJobStore) — holds executions, recurring entries and workflow runs. In-memory by default;UsePostgres(...)orUseMongo(...)for durability. - Artifact store (
IArtifactStore) — holds large payloads between nodes. The worker selects a provider by name at runtime (file,s3,gcs), so the choice is per-deployment, not compiled in.
Add your own by implementing the interface (plus an IArtifactStoreProvider for artifacts) — exactly how the Postgres/Mongo and S3/GCS packages do it.
Packages
Install the core plus the adapters you need — each keeps its SDK isolated, so you only pull in what you wire up. While in beta, add --prerelease.
| Package | Purpose |
|---|---|
Klassd.Workflows.Abstractions | The contract jobs implement: IJob, IJobContext, the IArtifactStore seam, and the worker stdout protocol. No dependencies. |
Klassd.Workflows.Core | Scheduler, in-memory store, cron recurring loop (Cronos), job catalog, DAG orchestrator, filesystem artifact store, and the local-process executor. |
Klassd.Workflows.Kubernetes | KubernetesJobExecutor — creates a batch/v1 Job per run and tails the pod logs. AddKubernetesExecutor(). |
Klassd.Workflows.Storage.Postgres | Durable IJobStore on PostgreSQL (jsonb documents + append-only logs). WorkflowsBuilder.UsePostgres(). |
Klassd.Workflows.Storage.MongoDb | Durable IJobStore on MongoDB. WorkflowsBuilder.UseMongo(). |
Klassd.Workflows.Artifacts.S3 | IArtifactStore on S3 / S3-compatible stores (provider name "s3") for large payloads passed between nodes. |
Klassd.Workflows.Artifacts.Gcs | IArtifactStore on Google Cloud Storage (provider name "gcs"). |