package com.example.messaging import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import java.time.Instant import java.util.UUID // ---------- Data Classes ---------- /** * Represents a message in a pub/sub system. */ data class Message( val id: String = UUID.randomUUID().toString(), val topic: String, val payload: String, val headers: Map = emptyMap(), val timestamp: Instant = Instant.now(), ) { val age: Long get() = Instant.now().epochSecond - timestamp.epochSecond fun withHeader(key: String, value: String): Message = copy(headers = headers + (key to value)) } /** * Subscriber identity and delivery preferences. */ data class Subscriber( val id: String = UUID.randomUUID().toString(), val name: String, val topics: Set, val maxRetries: Int = 3, ) // ---------- Sealed Result ---------- sealed class DeliveryResult { data class Success(val subscriberId: String, val messageId: String) : DeliveryResult() data class Failure(val subscriberId: String, val messageId: String, val reason: String) : DeliveryResult() data class Retry(val subscriberId: String, val messageId: String, val attempt: Int) : DeliveryResult() } // ---------- Extension Functions ---------- /** Check whether a message matches a subscriber's topic filter. */ fun Message.matchesTopic(pattern: String): Boolean { if (pattern == "*") return true if (pattern.endsWith(".*")) { val prefix = pattern.dropLast(2) return topic.startsWith(prefix) } return topic == pattern } /** Pretty-print a message for logging. */ fun Message.toLogString(): String = "[$topic] ${payload.take(80)}${if (payload.length > 80) "..." else ""} (${age}s ago)" // ---------- Broker ---------- /** * A simple coroutine-based message broker. * * Subscribers register for topics, and published messages are dispatched * concurrently to all matching subscribers. */ class MessageBroker(private val bufferSize: Int = 64) { private val subscribers = mutableListOf() private val channel = Channel(bufferSize) private val deliveryLog = mutableListOf() val subscriberCount: Int get() = subscribers.size val deliveryHistory: List get() = deliveryLog.toList() fun subscribe(subscriber: Subscriber): MessageBroker = apply { subscribers.add(subscriber) } fun unsubscribe(subscriberId: String): Boolean = subscribers.removeAll { it.id == subscriberId } suspend fun publish(message: Message) { channel.send(message) } /** * Start the broker's dispatch loop. Runs until the scope is cancelled. */ fun CoroutineScope.startDispatching(): Job = launch { for (message in channel) { val matched = subscribers.filter { sub -> sub.topics.any { message.matchesTopic(it) } } matched.map { sub -> async { deliver(sub, message) } }.awaitAll() } } private suspend fun deliver(subscriber: Subscriber, message: Message): DeliveryResult { repeat(subscriber.maxRetries) { attempt -> try { // Simulate async delivery with a small delay delay(10) val result = DeliveryResult.Success(subscriber.id, message.id) deliveryLog.add(result) return result } catch (e: Exception) { val retry = DeliveryResult.Retry(subscriber.id, message.id, attempt + 1) deliveryLog.add(retry) } } val failure = DeliveryResult.Failure(subscriber.id, message.id, "max retries exceeded") deliveryLog.add(failure) return failure } companion object { /** Create a broker pre-configured with a list of subscribers. */ fun withSubscribers(vararg subscribers: Subscriber): MessageBroker { val broker = MessageBroker() subscribers.forEach { broker.subscribe(it) } return broker } } }