Async Programming Patterns

Purpose

Using Rcon with asynchronous and reactive programming patterns.

Overview

Rcon provides both synchronous (sendCommand()) and asynchronous (sendCommandAsync()) APIs. The async API returns CompletableFuture<RconResponse> for non-blocking operations.

Built-in Async API

Basic Async Command

RconClient client = RconClient.builder()
    .host("localhost")
    .port(25575)
    .password("password")
    .build();

CompletableFuture<RconResponse> future = client.sendCommandAsync("list");

// Do other work while command executes...

// Get response when ready
RconResponse response = future.get();
System.out.println(response.getResponse());

With Callbacks

client.sendCommandAsync("seed")
    .thenAccept(response -> {
        System.out.println("Seed: " + response.getResponse());
    })
    .exceptionally(throwable -> {
        System.err.println("Failed: " + throwable.getMessage());
        return null;
    });

Chaining Multiple Commands

client.sendCommandAsync("list")
    .thenCompose(response -> {
        System.out.println("Players: " + response.getResponse());
        return client.sendCommandAsync("seed");
    })
    .thenAccept(response -> {
        System.out.println("Seed: " + response.getResponse());
    });

Parallel Execution

Multiple Independent Commands

CompletableFuture<RconResponse> listFuture = client.sendCommandAsync("list");
CompletableFuture<RconResponse> seedFuture = client.sendCommandAsync("seed");
CompletableFuture<RconResponse> diffFuture = client.sendCommandAsync("difficulty");

// All commands execute in parallel

// Wait for all to complete
CompletableFuture.allOf(listFuture, seedFuture, diffFuture)
    .thenAccept(v -> {
        System.out.println("List: " + listFuture.join().getResponse());
        System.out.println("Seed: " + seedFuture.join().getResponse());
        System.out.println("Difficulty: " + diffFuture.join().getResponse());
    });

Batch Processing

List<String> commands = List.of("list", "seed", "difficulty", "weather");

List<CompletableFuture<RconResponse>> futures = commands.stream()
    .map(client::sendCommandAsync)
    .toList();

// Collect all responses
CompletableFuture<Void> allOf = CompletableFuture.allOf(
    futures.toArray(new CompletableFuture[0])
);

List<RconResponse> responses = allOf.thenApply(v ->
    futures.stream()
        .map(CompletableFuture::join)
        .toList()
).join();

Custom Executor Service

Using Dedicated Thread Pool

ExecutorService executor = Executors.newFixedThreadPool(10);

CompletableFuture<RconResponse> future = client.sendCommandAsync("list", executor);

future.thenAcceptAsync(response -> {
    // Process response in executor thread
    System.out.println("Response: " + response.getResponse());
}, executor);

future.whenComplete((result, error) -> {
    // Cleanup
    executor.shutdown();
});

Virtual Threads (Java 21+)

ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();

CompletableFuture<RconResponse> future = client.sendCommandAsync("list", executor);

future.thenAccept(response -> {
    System.out.println("Response: " + response.getResponse());
});

Reactive Integration

Project Reactor

import reactor.core.publisher.Mono;

public class ReactiveRcon {
    private final RconClient client;

    public Mono<String> sendCommand(String command) {
        return Mono.fromCallable(() ->
            client.sendCommand(command).getResponse()
        );
    }

    public Mono<List<String>> sendCommands(List<String> commands) {
        return Flux.fromIterable(commands)
            .flatMap(this::sendCommand)
            .collectList();
    }
}

Usage:

ReactiveRcon reactive = new ReactiveRcon(client);

reactive.sendCommand("list")
    .doOnNext(System.out::println)
    .doOnError(e -> System.err.println("Error: " + e))
    .subscribe();

RxJava

import io.reactivex.Single;

public class RxRcon {
    private final RconClient client;

    public Single<String> sendCommand(String command) {
        return Single.fromCallable(() ->
            client.sendCommand(command).getResponse()
        );
    }

    public Single<List<String>> sendCommands(List<String> commands) {
        return Observable.fromIterable(commands)
            .flatMapSingle(this::sendCommand)
            .toList();
    }
}

Coroutine Integration (Kotlin)

Suspend Functions

import kotlinx.coroutines.*

class CoroutineRcon(private val client: RconClient) {
    suspend fun sendCommand(command: String): String =
        withContext(Dispatchers.IO) {
            client.sendCommand(command).response
        }

    suspend fun sendCommands(commands: List<String>): List<String> =
        coroutineScope {
            commands.map { async { sendCommand(it) } }
                .awaitAll()
        }
}

Usage:

val rcon = CoroutineRcon(client)

runBlocking {
    val response = rcon.sendCommand("list")
    println(response)

    val responses = rcon.sendCommands(listOf("list", "seed", "difficulty"))
    responses.forEach { println(it) }
}

Error Handling

Retry Pattern

public CompletableFuture<RconResponse> sendCommandWithRetry(
    String command,
    int maxRetries
) {
    return sendCommandAsync(command)
        .exceptionallyCompose(throwable -> {
            if (maxRetries > 0) {
                // Wait before retry
                return CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)
                    .execute(() -> sendCommandWithRetry(command, maxRetries - 1))
                    .thenCompose(future -> future);
            } else {
                return CompletableFuture.failedFuture(throwable);
            }
        });
}

Circuit Breaker

public class CircuitBreakerRcon {
    private final RconClient client;
    private final int threshold;
    private final long timeoutMillis;

    private int failureCount = 0;
    private long lastFailureTime = 0;
    private boolean circuitOpen = false;

    public CompletableFuture<RconResponse> sendCommand(String command) {
        if (circuitOpen) {
            long timeSinceLastFailure =
                System.currentTimeMillis() - lastFailureTime;
            if (timeSinceLastFailure > timeoutMillis) {
                circuitOpen = false;
                failureCount = 0;
            } else {
                return CompletableFuture.failedFuture(
                    new RconConnectionException("Circuit breaker is open")
                );
            }
        }

        return client.sendCommandAsync(command)
            .whenComplete((response, throwable) -> {
                if (throwable != null) {
                    failureCount++;
                    lastFailureTime = System.currentTimeMillis();
                    if (failureCount >= threshold) {
                        circuitOpen = true;
                    }
                } else {
                    failureCount = 0;
                }
            });
    }
}

Timeout Handling

Per-Command Timeout

public CompletableFuture<RconResponse> sendCommandWithTimeout(
    String command,
    Duration timeout
) {
    CompletableFuture<RconResponse> future = client.sendCommandAsync(command);
    CompletableFuture<RconResponse> timeoutFuture = CompletableFuture.failedFuture(
        new TimeoutException("Command timed out")
    );

    return future.completeOnTimeout(null, timeout.toMillis(), TimeUnit.MILLISECONDS)
        .thenApply(response -> {
            if (response == null) {
                throw new CompletionException(new TimeoutException());
            }
            return response;
        });
}

Metrics and Monitoring

Timing Commands

public class MetricsRconClient {
    private final RconClient delegate;
    private final MeterRegistry meterRegistry;

    public CompletableFuture<RconResponse> sendCommand(String command) {
        Timer.Sample sample = Timer.start(meterRegistry);

        return delegate.sendCommandAsync(command)
            .whenComplete((response, throwable) -> {
                sample.stop(meterRegistry.timer("rcon.command.latency"));

                if (throwable != null) {
                    meterRegistry.counter("rcon.command.errors").increment();
                } else {
                    meterRegistry.counter("rcon.command.success").increment();
                }
            });
    }
}

See Also


This site uses Just the Docs, a documentation theme for Jekyll.