Workflows

Background jobs & workflows, code-first

Klassd.Workflows is a code-first, NuGet-distributed background-job and workflow engine for .NET. Jobs are plain C# classes; the scheduler runs each one as its own Kubernetes pod in production and as a local process in dev — the same worker either way. Compose jobs into DAG workflows and watch them run live.

← Back to the Workflows overview. A companion to Klassd, the code-first headless CMS — same philosophy, separate package.

What you get

Code-first jobs

A job is a C# class implementing IJob — no attributes-as-config, no YAML. The engine discovers it by type name.

Runs in its own pod

Each execution is a batch/v1 Kubernetes Job (one pod), with per-job CPU/memory requests and limits resolved from attribute + config.

Same worker, local too

In dev the same worker runs as a child process — no cluster needed. Switch to Kubernetes with one config setting.

DAG workflows

Compose jobs into a graph: dependencies, fan-out (one pod per item), conditional nodes, retries, and artifact passing between nodes.

Live dashboard

A Blazor Server UI shows the job list, per-job console + progress, recurring jobs, and an SVG view of each DAG run.

Durable & pluggable

Swap the job store (in-memory / PostgreSQL / MongoDB) and the artifact store (filesystem / S3 / GCS) — or ship your own adapter.

Quickstart

Install the core plus the adapters you need. While Klassd.Workflows is in beta the packages are prerelease:

dotnet add package Klassd.Workflows.Core --prerelease
dotnet add package Klassd.Workflows.Storage.Postgres --prerelease   # durable store (or .Storage.MongoDb)
dotnet add package Klassd.Workflows.Kubernetes --prerelease         # K8s executor (omit for local only)
dotnet add package Klassd.Workflows.Artifacts.S3 --prerelease       # artifact store (or .Artifacts.Gcs)

1. Define a job

Any class implementing IJob is a unit of work. Use the IJobContext to log, report progress, and read arguments:

public sealed class MyJob : IJob
{
    public async Task RunAsync(IJobContext ctx)
    {
        ctx.Log("starting");
        ctx.ReportProgress(50, "halfway");
        await Task.Delay(1000, ctx.CancellationToken);
        ctx.Log("done");
    }
}

2. Wire it up in Program.cs

AddKlassdWorkflowsCore() returns a builder you use to pick a durable store; pick an executor separately:

var workflows = builder.Services.AddKlassdWorkflowsCore();
workflows.UsePostgres("Host=…;Database=…;Username=…;Password=…");  // or .UseMongo(...) / in-memory

builder.Services.AddKubernetesExecutor(builder.Configuration);     // or AddLocalExecutor(workerDll)

3. Run

Enqueue jobs from code, or open the dashboard to start/stop them and watch live console output.

Scheduling

Fire a job now, or register a recurring job with a cron expression (parsed by Cronos):

scheduler.AddOrUpdateRecurring<MyJob>("nightly", "0 2 * * *");   // cron
await scheduler.EnqueueAsync<MyJob>();                            // fire now

Recurring workflows are registered the same way with AddOrUpdateRecurringWorkflow(id, name, cron). Pod resources (CPU/memory requests & limits) are set per job with a [JobResources] attribute and can be retuned from config without a recompile.

Workflows (DAGs)

Jobs compose into a directed acyclic graph that fans out, waits on dependencies and passes data between nodes. The orchestrator runs in the scheduler; each node runs as a normal worker pod, so every node has its own live console.

registry.Register(new WorkflowBuilder("catalog-integration")
    .Add<MarketFinderJob>("markets")                       // root: emits "market_ids"
    .Add<DataProxyJob>("data-proxy")                       // parallel root: writes an artifact
    .Add<IntegrationJob>("integration", n => n
        .DependsOn("markets", "data-proxy")
        .FanOutOver("markets", "market_ids", itemArgument: "market"))   // one pod per market
    .Add<PublishJob>("publish", n => n.DependsOn("integration").WithRetries(2))
    .Add<FinalizerJob>("finalizer", n => n
        .DependsOn("publish", "data-proxy")
        .BindInput("dataset_ref", "data-proxy", "dataset_ref"))          // reads the artifact
    .Build());
  • Dependencies — a node starts once all its dependencies are satisfied; a failed dependency skips dependents.
  • Fan-out — read an upstream output as a JSON array and start one execution per element.
  • Conditions — run a node only when a predicate over upstream outputs holds (otherwise it's benignly omitted).
  • Retries — re-run a failed execution up to n times, per fan-out item.
  • Artifacts — large payloads pass through an IArtifactStore; a node saves an artifact and publishes the small reference downstream.

Executors: local & Kubernetes

The same worker runs locally and in the cluster — only the executor that launches it differs. Communication is a line protocol on stdout, so Kubernetes pod logs are the transport for free.

  • LocalAddLocalExecutor(...) launches the worker as a child process per job. No cluster required; ideal for dev.
  • KubernetesAddKubernetesExecutor(...) creates a batch/v1 Job (one pod, restartPolicy: Never) per execution, tails its logs, and cleans up via ttlSecondsAfterFinished. Stopping a job deletes the Job; SIGTERM cancels the worker's token.

Storage & artifacts

Two pluggable seams, both with built-in adapters and open for your own:

  • Job store (IJobStore) — holds executions, recurring entries and workflow runs. In-memory by default; UsePostgres(...) or UseMongo(...) for durability.
  • Artifact store (IArtifactStore) — holds large payloads between nodes. The worker selects a provider by name at runtime (file, s3, gcs), so the choice is per-deployment, not compiled in.

Add your own by implementing the interface (plus an IArtifactStoreProvider for artifacts) — exactly how the Postgres/Mongo and S3/GCS packages do it.

Packages

Install the core plus the adapters you need — each keeps its SDK isolated, so you only pull in what you wire up. While in beta, add --prerelease.

PackagePurpose
Klassd.Workflows.AbstractionsThe contract jobs implement: IJob, IJobContext, the IArtifactStore seam, and the worker stdout protocol. No dependencies.
Klassd.Workflows.CoreScheduler, in-memory store, cron recurring loop (Cronos), job catalog, DAG orchestrator, filesystem artifact store, and the local-process executor.
Klassd.Workflows.KubernetesKubernetesJobExecutor — creates a batch/v1 Job per run and tails the pod logs. AddKubernetesExecutor().
Klassd.Workflows.Storage.PostgresDurable IJobStore on PostgreSQL (jsonb documents + append-only logs). WorkflowsBuilder.UsePostgres().
Klassd.Workflows.Storage.MongoDbDurable IJobStore on MongoDB. WorkflowsBuilder.UseMongo().
Klassd.Workflows.Artifacts.S3IArtifactStore on S3 / S3-compatible stores (provider name "s3") for large payloads passed between nodes.
Klassd.Workflows.Artifacts.GcsIArtifactStore on Google Cloud Storage (provider name "gcs").

View on GitHubAll NuGet packages