Custom Task Queue Backends
The task queue is the boundary between the operator and agent pods. By default it uses Redis Streams with consumer groups. You can replace it with any message broker by implementing the TaskQueue and StreamChannel interfaces.
The interfaces
// internal/agent/queue/queue.go
type TaskQueue interface {
Submit(ctx context.Context, prompt string, meta map[string]string) (string, error)
Poll(ctx context.Context) (*Task, error)
Ack(task Task, result string, usage TokenUsage) error
Nack(task Task, reason string) error
Results(ctx context.Context, taskIDs []string) ([]TaskResult, error)
Close()
}
type StreamChannel interface {
Publish(key, chunk string) error
Done(key string) error
Read(ctx context.Context, key string) (string, error)
}
TaskQueue handles task lifecycle: submit a task, poll for work, acknowledge completion or failure, and retrieve results.
StreamChannel handles streaming output: publish chunks as the LLM generates them, signal completion, and read the assembled output. The dashboard uses this for live log streaming.
The two interfaces can share the same backend (the Redis adapter does) or use different backends — controlled by TASK_QUEUE_URL and STREAM_CHANNEL_URL separately.
Step 1: Implement the interfaces
// internal/agent/queue/nats/queue.go
package nats
import (
"context"
"github.com/arkonis-dev/ark-operator/internal/agent/queue"
natsgo "github.com/nats-io/nats.go"
)
type NATSQueue struct {
conn *natsgo.Conn
js natsgo.JetStreamContext
}
func New(url string) (*NATSQueue, error) {
// connect and return
}
func (q *NATSQueue) Submit(ctx context.Context, prompt string, meta map[string]string) (string, error) {
// publish to subject, return task ID
}
func (q *NATSQueue) Poll(ctx context.Context) (*queue.Task, error) {
// subscribe and fetch next message
}
func (q *NATSQueue) Ack(task queue.Task, result string, usage queue.TokenUsage) error {
// store result and ack the message
}
func (q *NATSQueue) Nack(task queue.Task, reason string) error {
// nack (or re-queue) the message
}
func (q *NATSQueue) Results(ctx context.Context, taskIDs []string) ([]queue.TaskResult, error) {
// fetch results by ID from your store
}
func (q *NATSQueue) Close() {
q.conn.Close()
}
// Register on import
func init() {
queue.RegisterQueue("nats", func(url string) (queue.TaskQueue, error) {
return New(url)
})
}
Step 2: Register the backend
Blank-import your package in the operator entrypoint:
// cmd/main.go
import (
_ "github.com/arkonis-dev/ark-operator/internal/agent/queue/nats"
)
Do the same in the agent runtime entrypoint (runtime/agent/main.go) — both the operator and the agent pods need to know how to connect.
Step 3: Configure via URL scheme
The backend is selected from the URL scheme in TASK_QUEUE_URL. For a NATS backend:
TASK_QUEUE_URL=nats://nats.messaging.svc.cluster.local:4222
Set it in Helm:
helm upgrade ark-operator arkonis/ark-operator \
--set taskQueueURL=nats://nats.messaging.svc.cluster.local:4222
If STREAM_CHANNEL_URL is not set, it defaults to TASK_QUEUE_URL (same backend for both task queue and stream channel).
Built-in backends
| Scheme | Backend | Interfaces |
|---|---|---|
redis://, rediss:// | Redis Streams | TaskQueue + StreamChannel |
memory:// | In-process | TaskQueue only (used by ark CLI) |
RegisterQueue and RegisterStream
// Register a TaskQueue backend
queue.RegisterQueue("myscheme", func(url string) (queue.TaskQueue, error) {
return MyQueue{url: url}, nil
})
// Register a StreamChannel backend (if different from queue)
queue.RegisterStream("myscheme", func(url string) (queue.StreamChannel, error) {
return MyStream{url: url}, nil
})
Both functions panic if the scheme is already registered — register each scheme once from an init().
Testing your backend locally
The ark CLI uses the in-memory queue backend, so you cannot test a custom queue backend with ark run. Instead, write an integration test that:
- Starts your broker (or uses a local instance)
- Creates a queue instance via your
New()constructor - Submits a task, polls it from a goroutine, acks it, and reads the result
The Redis adapter tests (internal/agent/queue/redis/queue_test.go) can serve as a template.
See also
- Task Queue concepts — Redis Streams internals and queue semantics
- Custom LLM Providers — implement a custom model provider
- Environment Variables —
TASK_QUEUE_URL,STREAM_CHANNEL_URL