Skip to main content

Advanced Configuration

This guide covers advanced configuration options for KubeOps operators, including finalizer management, custom leader election, and durable requeue mechanisms.

Finalizer Management

KubeOps provides automatic finalizer attachment and detachment to ensure proper resource cleanup. These features can be configured through OperatorSettings.

Auto-Attach Finalizers

By default, KubeOps automatically attaches finalizers to entities during reconciliation. This ensures that cleanup operations are performed before resources are deleted.

builder.Services
.AddKubernetesOperator(settings =>
{
// Enable automatic finalizer attachment (default: true)
settings.AutoAttachFinalizers = true;
});

When AutoAttachFinalizers is enabled:

  • Finalizers are automatically added to entities during reconciliation
  • You don't need to manually call the EntityFinalizerAttacher delegate
  • All registered finalizers for an entity type are automatically attached

When disabled:

settings.AutoAttachFinalizers = false;

You must manually attach finalizers in your controller:

public class V1DemoEntityController(
ILogger<V1DemoEntityController> logger,
EntityFinalizerAttacher<DemoFinalizer, V1DemoEntity> finalizerAttacher)
: IEntityController<V1DemoEntity>
{
public async Task<ReconciliationResult<V1DemoEntity>> ReconcileAsync(
V1DemoEntity entity,
CancellationToken cancellationToken)
{
// Manually attach finalizer
entity = await finalizerAttacher(entity, cancellationToken);

// Continue with reconciliation logic
logger.LogInformation("Reconciling entity {Entity}", entity);
return ReconciliationResult<V1DemoEntity>.Success(entity);
}

public Task<ReconciliationResult<V1DemoEntity>> DeletedAsync(
V1DemoEntity entity,
CancellationToken cancellationToken)
{
return Task.FromResult(ReconciliationResult<V1DemoEntity>.Success(entity));
}
}

Auto-Detach Finalizers

KubeOps automatically removes finalizers after successful finalization. This can also be configured:

builder.Services
.AddKubernetesOperator(settings =>
{
// Enable automatic finalizer removal (default: true)
settings.AutoDetachFinalizers = true;
});

When AutoDetachFinalizers is enabled:

  • Finalizers are automatically removed when FinalizeAsync returns success

When disabled:

settings.AutoDetachFinalizers = false;

You must manually manage finalizer removal, which is typically not recommended unless you have specific requirements.

Use Cases

Keep defaults enabled when:

  • You want standard finalizer behavior
  • Your finalizers follow the typical pattern
  • You don't need fine-grained control

Disable auto-attach when:

  • You need conditional finalizer attachment
  • Different instances should have different finalizers
  • You want to attach finalizers based on specific conditions

Disable auto-detach when:

  • You need custom finalizer removal logic
  • You want to coordinate multiple finalizers manually
  • You have external systems that need to confirm cleanup

Custom Leader Election

KubeOps supports different leader election mechanisms through the LeaderElectionType setting. This allows you to control how multiple operator instances coordinate in a cluster.

Leader Election Types

public enum LeaderElectionType
{
None = 0, // No leader election - all instances process events
Single = 1, // Single leader election - only one instance processes events
Custom = 2 // Custom implementation - user-defined coordination
}

Configuration

builder.Services
.AddKubernetesOperator(settings =>
{
settings.LeaderElectionType = LeaderElectionType.Single;
settings.LeaderElectionLeaseDuration = TimeSpan.FromSeconds(15);
settings.LeaderElectionRenewDeadline = TimeSpan.FromSeconds(10);
settings.LeaderElectionRetryPeriod = TimeSpan.FromSeconds(2);
});

Custom Leader Election

The Custom leader election type allows you to implement your own coordination logic, such as namespace-based leader election.

Example: Namespace-Based Leader Election

In some scenarios, you may want different operator instances to handle different namespaces. This enables horizontal scaling while maintaining isolation.

Step 1: Implement a custom ResourceWatcher

public sealed class NamespacedLeaderElectionResourceWatcher<TEntity>(
ActivitySource activitySource,
ILogger<NamespacedLeaderElectionResourceWatcher<TEntity>> logger,
IReconciler<TEntity> reconciler,
OperatorSettings settings,
IEntityLabelSelector<TEntity> labelSelector,
IKubernetesClient client,
INamespaceLeadershipManager namespaceLeadershipManager)
: ResourceWatcher<TEntity>(
activitySource,
logger,
reconciler,
settings,
labelSelector,
client)
where TEntity : IKubernetesObject<V1ObjectMeta>
{
protected override async Task<ReconciliationResult<TEntity>> OnEventAsync(
WatchEventType eventType,
TEntity entity,
CancellationToken cancellationToken)
{
// Check if this instance is responsible for the entity's namespace
if (!await namespaceLeadershipManager.IsResponsibleForNamespace(
entity.Namespace(),
cancellationToken))
{
// Skip processing - another instance handles this namespace
return ReconciliationResult<TEntity>.Success(entity);
}

// Process the event
return await base.OnEventAsync(eventType, entity, cancellationToken);
}
}

Step 2: Implement the leadership manager

public interface INamespaceLeadershipManager
{
Task<bool> IsResponsibleForNamespace(string @namespace, CancellationToken cancellationToken);
}

public class NamespaceLeadershipManager : INamespaceLeadershipManager
{
private readonly ILeaderElector _leaderElector;
private readonly ConcurrentDictionary<string, bool> _namespaceResponsibility = new();

public async Task<bool> IsResponsibleForNamespace(
string @namespace,
CancellationToken cancellationToken)
{
// Implement your logic here:
// - Consistent hashing of namespace names
// - Lease-based namespace assignment
// - External coordination service (e.g., etcd, Consul)

return _namespaceResponsibility.GetOrAdd(
@namespace,
ns => CalculateResponsibility(ns));
}

private bool CalculateResponsibility(string @namespace)
{
// Example: Simple hash-based distribution
var instanceId = Environment.GetEnvironmentVariable("POD_NAME") ?? "instance-0";
var instanceCount = int.Parse(
Environment.GetEnvironmentVariable("REPLICA_COUNT") ?? "1");

var namespaceHash = @namespace.GetHashCode();
var assignedInstance = Math.Abs(namespaceHash % instanceCount);
var currentInstance = int.Parse(instanceId.Split('-').Last());

return assignedInstance == currentInstance;
}
}

Step 3: Register the custom watcher

builder.Services
.AddKubernetesOperator(settings =>
{
settings.LeaderElectionType = LeaderElectionType.Custom;
})
.AddSingleton<INamespaceLeadershipManager, NamespaceLeadershipManager>()
.AddHostedService<NamespacedLeaderElectionResourceWatcher<V1DemoEntity>>();

Benefits of Custom Leader Election

  • Horizontal Scaling: Multiple instances can process different subsets of resources
  • Namespace Isolation: Different teams or environments can have dedicated operator instances
  • Geographic Distribution: Route requests to instances in specific regions
  • Load Balancing: Distribute work across multiple instances

Custom Requeue Mechanism

By default, KubeOps uses an in-memory queue for requeuing entities. This queue is volatile and does not survive operator restarts. For production scenarios, you may want to implement a durable queue.

Default Behavior

The default ITimedEntityQueue<TEntity> implementation:

  • Stores requeue entries in memory
  • Processes them after the specified delay
  • Loses pending requeues on operator restart

Implementing a Durable Queue

You can implement ITimedEntityQueue<TEntity> to use external queue systems like Azure Service Bus, RabbitMQ, or AWS SQS.

Example: Azure Service Bus Integration

Step 1: Implement ITimedEntityQueue

public sealed class DurableTimedEntityQueue<TEntity>(
ServiceBusClient serviceBusClient,
IEntityRequeueQueueNameProvider queueNameProvider,
TimeProvider timeProvider)
: ITimedEntityQueue<TEntity>
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ServiceBusSender _sender = serviceBusClient.CreateSender(
queueNameProvider.GetRequeueQueueName<TEntity>());

public async Task Enqueue(
TEntity entity,
RequeueType type,
TimeSpan requeueIn,
CancellationToken cancellationToken)
{
var entry = new RequeueEntry<TEntity>() { Entity = entity, RequeueType = type };
var message = new ServiceBusMessage(BinaryData.FromObjectAsJson(entry));

// Schedule the message for future delivery
await _sender.ScheduleMessageAsync(
message,
timeProvider.GetUtcNow().Add(requeueIn),
cancellationToken);
}

public Task Remove(TEntity entity, CancellationToken cancellationToken)
{
// will be automatically removed when the message is processed
return Task.CompletedTask;
}

public async IAsyncEnumerator<RequeueEntry<TEntity>> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
// Not used with external queue - processing happens via the processor
await Task.CompletedTask;
yield break;
}

public void Dispose()
{
_sender.DisposeAsync().AsTask().GetAwaiter().GetResult();
}
}

Step 2: Implement a background service to process messages

public sealed class DurableEntityRequeueBackgroundService<TEntity>(
ServiceBusClient serviceBusClient,
IKubernetesClient kubernetesClient,
IReconciler<TEntity> reconciler,
IEntityRequeueQueueNameProvider queueNameProvider,
ILogger<DurableEntityRequeueBackgroundService<TEntity>> logger)
: BackgroundService
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ServiceBusProcessor _processor = serviceBusClient.CreateProcessor(
queueNameProvider.GetRequeueQueueName<TEntity>(),
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 1,
AutoCompleteMessages = false
});

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;

await _processor.StartProcessingAsync(stoppingToken);

try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
// Expected during shutdown
}
}

private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var entry = args.Message.Body.ToObjectFromJson<RequeueEntry<TEntity>>();

// Verify entity still exists
var entity = await kubernetesClient.GetAsync<TEntity>(
entry.Entity.Name(),
entry.Entity.Namespace(),
args.CancellationToken);

if (entity == null)
{
logger.LogInformation(
"Skipping reconciliation for deleted entity {Name}",
entry.Entity.Name());
await args.CompleteMessageAsync(args.Message, args.CancellationToken);
return;
}

// Complete message before reconciliation to avoid reprocessing
await args.CompleteMessageAsync(args.Message, args.CancellationToken);

// Trigger reconciliation
await reconciler.Reconcile(
ReconciliationContext<TEntity>.CreateFromOperatorEvent(
entity,
entry.RequeueType.ToWatchEventType()),
args.CancellationToken);
}

private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
logger.LogError(args.Exception, "Error processing requeue message");
return Task.CompletedTask;
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _processor.StopProcessingAsync(cancellationToken);
_processor.ProcessMessageAsync -= ProcessMessageAsync;
_processor.ProcessErrorAsync -= ProcessErrorAsync;
await _processor.DisposeAsync();
await base.StopAsync(cancellationToken);
}
}

Step 3: Register the durable queue

builder.Services
.AddSingleton<ServiceBusClient>(sp =>
new ServiceBusClient(configuration["ServiceBus:ConnectionString"]))
.AddSingleton<IEntityRequeueQueueNameProvider, EntityRequeueQueueNameProvider>()
.AddKubernetesOperator()
.RegisterComponents();

// Replace the default queue with the durable implementation
builder.Services.Replace(ServiceDescriptor.Singleton(
typeof(ITimedEntityQueue<>),
typeof(DurableTimedEntityQueue<>)));

// Add the background service to process messages
builder.Services.AddHostedService<DurableEntityRequeueBackgroundService<V1DemoEntity>>();

Step 4: Create the queue name provider

public interface IEntityRequeueQueueNameProvider
{
string GetRequeueQueueName<TEntity>() where TEntity : IKubernetesObject<V1ObjectMeta>;
}

public class EntityRequeueQueueNameProvider : IEntityRequeueQueueNameProvider
{
public string GetRequeueQueueName<TEntity>()
where TEntity : IKubernetesObject<V1ObjectMeta>
{
return $"operator-requeue-{typeof(TEntity).Name.ToLowerInvariant()}";
}
}

Benefits of Durable Requeues

  • Persistence: Requeue requests survive operator restarts
  • Reliability: Messages are not lost during failures
  • Scalability: External queue systems can handle high volumes
  • Observability: Queue metrics provide insights into requeue patterns
  • Coordination: Multiple operator instances can share the same queue

Combining Custom Leader Election with Durable Queues

For advanced scenarios, you can combine namespace-based leader election with durable queues:

public sealed class NamespacedLeaderElectionEntityRequeueBackgroundService<TEntity>(
ServiceBusClient serviceBusClient,
IKubernetesClient kubernetesClient,
IReconciler<TEntity> reconciler,
IEntityRequeueQueueNameProvider queueNameProvider,
INamespaceLeadershipManager namespaceLeadershipManager,
ILogger<NamespacedLeaderElectionEntityRequeueBackgroundService<TEntity>> logger)
: BackgroundService
where TEntity : IKubernetesObject<V1ObjectMeta>
{
private readonly ServiceBusProcessor _processor = serviceBusClient.CreateProcessor(
queueNameProvider.GetRequeueQueueName<TEntity>(),
new ServiceBusProcessorOptions
{
MaxConcurrentCalls = 1,
AutoCompleteMessages = false
});

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_processor.ProcessMessageAsync += ProcessMessageAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync(stoppingToken);

try
{
await Task.Delay(Timeout.Infinite, stoppingToken);
}
catch (OperationCanceledException)
{
// Expected during shutdown
}
}

private async Task ProcessMessageAsync(ProcessMessageEventArgs args)
{
var entry = args.Message.Body.ToObjectFromJson<RequeueEntry<TEntity>>();

// Verify entity still exists
var entity = await kubernetesClient.GetAsync<TEntity>(
entry.Entity.Name(),
entry.Entity.Namespace(),
args.CancellationToken);

if (entity == null)
{
logger.LogInformation("Entity no longer exists, completing message");
await args.CompleteMessageAsync(args.Message, args.CancellationToken);
return;
}

// Check if this instance is responsible for the namespace
if (!await namespaceLeadershipManager.IsResponsibleForNamespace(
entity.Namespace(),
args.CancellationToken))
{
logger.LogInformation(
"Not responsible for namespace {Namespace}, abandoning message",
entity.Namespace());

// Abandon the message so another instance can process it
await args.AbandonMessageAsync(args.Message, cancellationToken: args.CancellationToken);
return;
}

// Complete message and trigger reconciliation
await args.CompleteMessageAsync(args.Message, args.CancellationToken);

await reconciler.Reconcile(
ReconciliationContext<TEntity>.CreateFromOperatorEvent(
entity,
entry.RequeueType.ToWatchEventType()),
args.CancellationToken);
}

private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
logger.LogError(args.Exception, "Error processing requeue message");
return Task.CompletedTask;
}

public override async Task StopAsync(CancellationToken cancellationToken)
{
await _processor.StopProcessingAsync(cancellationToken);
_processor.ProcessMessageAsync -= ProcessMessageAsync;
_processor.ProcessErrorAsync -= ProcessErrorAsync;
await _processor.DisposeAsync();
await base.StopAsync(cancellationToken);
}
}

This combination provides:

  • Namespace-based work distribution across operator instances
  • Durable requeue persistence for reliability
  • Message abandonment when an instance is not responsible, allowing proper routing

Best Practices

Finalizer Management

  1. Keep defaults enabled for most use cases
  2. Monitor finalizer attachment in your logs
  3. Test finalizer behavior in development before production
  4. Handle finalizer failures gracefully with proper error messages

Leader Election

  1. Start with Single for simple deployments
  2. Use Custom only when you need advanced coordination
  3. Test failover scenarios to ensure seamless transitions
  4. Monitor leader election status in your logs
  5. Set appropriate lease durations based on your workload

Requeue Mechanisms

  1. Use in-memory queue for development and simple operators
  2. Implement durable queues for production workloads
  3. Monitor queue depth to detect processing issues
  4. Set appropriate requeue delays to avoid overwhelming your system
  5. Handle message deduplication to prevent duplicate processing
  6. Test restart scenarios to ensure requeues survive failures

Troubleshooting

Finalizers Not Attaching

  • Check settings.AutoAttachFinalizers is true
  • Verify finalizer is registered with AddFinalizer<TFinalizer, TEntity>()
  • Check logs for finalizer attachment errors

Leader Election Issues

  • Verify RBAC permissions for lease resources
  • Check network connectivity between instances
  • Review lease duration settings
  • Monitor logs for leader election events
  • Ensure cluster time and local time are synchronized (time drift can cause lease issues)

Requeue Problems

  • Verify queue connection (for durable queues)
  • Check queue permissions and quotas
  • Monitor message processing errors
  • Review requeue delay settings
  • Ensure entities still exist before reprocessing