Custom Task Queue Backends

Implement and register a custom TaskQueue or StreamChannel backend to use NATS, SQS, Kafka, or any other message broker with ark-operator.

The task queue is the boundary between the operator and agent pods. By default it uses Redis Streams with consumer groups. You can replace it with any message broker by implementing the TaskQueue and StreamChannel interfaces.


The interfaces

// internal/agent/queue/queue.go

type TaskQueue interface {
    Submit(ctx context.Context, prompt string, meta map[string]string) (string, error)
    Poll(ctx context.Context) (*Task, error)
    Ack(task Task, result string, usage TokenUsage) error
    Nack(task Task, reason string) error
    Results(ctx context.Context, taskIDs []string) ([]TaskResult, error)
    Close()
}

type StreamChannel interface {
    Publish(key, chunk string) error
    Done(key string) error
    Read(ctx context.Context, key string) (string, error)
}

TaskQueue handles task lifecycle: submit a task, poll for work, acknowledge completion or failure, and retrieve results.

StreamChannel handles streaming output: publish chunks as the LLM generates them, signal completion, and read the assembled output. The dashboard uses this for live log streaming.

The two interfaces can share the same backend (the Redis adapter does) or use different backends — controlled by TASK_QUEUE_URL and STREAM_CHANNEL_URL separately.


Step 1: Implement the interfaces

// internal/agent/queue/nats/queue.go
package nats

import (
    "context"

    "github.com/arkonis-dev/ark-operator/internal/agent/queue"
    natsgo "github.com/nats-io/nats.go"
)

type NATSQueue struct {
    conn *natsgo.Conn
    js   natsgo.JetStreamContext
}

func New(url string) (*NATSQueue, error) {
    // connect and return
}

func (q *NATSQueue) Submit(ctx context.Context, prompt string, meta map[string]string) (string, error) {
    // publish to subject, return task ID
}

func (q *NATSQueue) Poll(ctx context.Context) (*queue.Task, error) {
    // subscribe and fetch next message
}

func (q *NATSQueue) Ack(task queue.Task, result string, usage queue.TokenUsage) error {
    // store result and ack the message
}

func (q *NATSQueue) Nack(task queue.Task, reason string) error {
    // nack (or re-queue) the message
}

func (q *NATSQueue) Results(ctx context.Context, taskIDs []string) ([]queue.TaskResult, error) {
    // fetch results by ID from your store
}

func (q *NATSQueue) Close() {
    q.conn.Close()
}

// Register on import
func init() {
    queue.RegisterQueue("nats", func(url string) (queue.TaskQueue, error) {
        return New(url)
    })
}

Step 2: Register the backend

Blank-import your package in the operator entrypoint:

// cmd/main.go
import (
    _ "github.com/arkonis-dev/ark-operator/internal/agent/queue/nats"
)

Do the same in the agent runtime entrypoint (runtime/agent/main.go) — both the operator and the agent pods need to know how to connect.


Step 3: Configure via URL scheme

The backend is selected from the URL scheme in TASK_QUEUE_URL. For a NATS backend:

TASK_QUEUE_URL=nats://nats.messaging.svc.cluster.local:4222

Set it in Helm:

helm upgrade ark-operator arkonis/ark-operator \
  --set taskQueueURL=nats://nats.messaging.svc.cluster.local:4222

If STREAM_CHANNEL_URL is not set, it defaults to TASK_QUEUE_URL (same backend for both task queue and stream channel).


Built-in backends

SchemeBackendInterfaces
redis://, rediss://Redis StreamsTaskQueue + StreamChannel
memory://In-processTaskQueue only (used by ark CLI)

RegisterQueue and RegisterStream

// Register a TaskQueue backend
queue.RegisterQueue("myscheme", func(url string) (queue.TaskQueue, error) {
    return MyQueue{url: url}, nil
})

// Register a StreamChannel backend (if different from queue)
queue.RegisterStream("myscheme", func(url string) (queue.StreamChannel, error) {
    return MyStream{url: url}, nil
})

Both functions panic if the scheme is already registered — register each scheme once from an init().


Testing your backend locally

The ark CLI uses the in-memory queue backend, so you cannot test a custom queue backend with ark run. Instead, write an integration test that:

  1. Starts your broker (or uses a local instance)
  2. Creates a queue instance via your New() constructor
  3. Submits a task, polls it from a goroutine, acks it, and reads the result

The Redis adapter tests (internal/agent/queue/redis/queue_test.go) can serve as a template.


See also