Engineering in the Wild

Scaling Platforms: Managing Complexity with Interfaces & Adapters

TLDR

Following the beta launch of our platform, I had two big challenges to solve:

Continuing development-as-is with our base system architecture would quickly accumulate technical debt at an increasing rate of interest. The decision was made to refactor in step-wise manner and adopt the ports-and-adaptor pattern. The larger vision was to lay the groundwork to transition to an events-driven architecture, which is more suited to ML platforms with long running processes and monitoring.

Ok, Let's dive into it.


The Problem: Infrastructure Dependencies Limiting System Flexibility

For our application, we faced several architectural challenges that hindered scalability and maintainability:

If you've been an engineer dealing with any of the above, then you'll agree that it really turns the work into a grind. At the end of the day, I think it feels so much better to be working in a system that feels well-engineered™.

This is a short post of how the Ports and Adapters pattern was adapted to solve these challenges, especially in applications dealing with ML and analytics.

Core Architectural Principles

The Fundamental Rule: Domain Independence

The cornerstone principle of Ports and Adapters architecture is straightforward and powerful:

"Internal modules should not be aware of outer modules."

This creates an outward dependency flow that shields core logic from external implementation details like infrastructure. Essentially, its dependency topology: is a directed acyclic graph (DAG) that prevents circular references.dependencies CleanShot 2025-03-28 at 20

Rather than the traditional "hexagon" metaphor, I find it more intuitive to visualize this as n-sided polygon model, where:

For our ML platform, we categorized these external concerns into two practical groups:

Initialization-Bound Interfaces These are resolved when the application starts up, typically varying by deployment environment:

Request-Bound Interfaces These are resolved during request handling, often determined by request parameters:

Component Architecture: Enforcing Bounded Contexts Through Layering

The effectiveness of Ports and Adapters relies on discipline in maintaining clear layer boundaries and communication patterns. For our ML platform, we established a strict three-layer architecture that created clear ownership boundaries and drastically simplified testing strategies:

Domain Layer: Pure Logic I like to think of this as stations in a kitchen. Each is an island dedicated to a specific purpose: hot, grills, etc.This inner layer contains stateless, deterministic components with zero infrastructure dependencies:

I adopted a key design principle: configuration is declarative, computation is functional and stateless. i.e For Pure Functions, attributes serve only as configuration parameters, while all data transformations occur through pure functions with explicit inputs and outputs—facilitating composition and testability.

Application Layer: Use Case Orchestration This middleware layer coordinates interactions between domain logic and external systems. Its like the serving station where the chef assembles the ingredients.

We organized this layer into two service families—api_services for synchronous request handling and event_services for asynchronous processing—creating a clean separation between different interaction patterns.

Infrastructure Layer: Concrete Implementations This outer layer contains all environment-specific code and external system integrations. This is like the operations team in a restaurant that takes care or reservations, client requests, supply-chain logistics. Its not sexy, but hell yeah is it useful. :)

This layering discipline created unmistakable clarity in our codebase—every component has exactly one architectural responsibility, with dependencies flowing exclusively outward from domain to infrastructure.

Traditional vs. Hexagonal Architecture: Dependency Patterns Formalized

The distinction between traditional layered architecture and the Ports & Adapters pattern can be formalized as a difference in dependency graphs. Let me illustrate with a practical comparison:

In the traditional approach, domain logic directly depends on infrastructure implementations:

class PredictionEngine:
    def __init__(self):
        # Direct coupling to specific infrastructure
        self.db_client = MongoClient("mongodb://localhost:27017/")
        self.db = self.db_client["prediction_store"]
        self.collection = self.db["model_predictions"]
    
    def generate_prediction(self, features, model_id):
        # Business logic intermixed with infrastructure concerns
        if not self.collection.find_one({"model_id": model_id}):
            raise ValueError(f"Unknown model: {model_id}")
        
        # Direct dependency on storage details
        model_config = self.collection.find_one({"model_id": model_id})
        model = self._load_model(model_config["path"])
        
        # Core algorithmic work mixed with persistence
        prediction = model.predict(features)
        prediction_id = self.collection.insert_one({
            "model_id": model_id,
            "features": features,
            "result": prediction.tolist(),
            "timestamp": datetime.now()
        }).inserted_id
        
        return {"prediction_id": str(prediction_id), "result": prediction}

The Ports & Adapters pattern establishes a formal dependency inversion, creating a directed acyclic graph of dependencies flowing outward from domain logic, showing the concepts above now in code.

# Domain Layer - Abstract interfaces as formal contracts
from typing import Protocol, Dict, Any, Optional, List

class ModelRepositoryInterface(Protocol):
    """Abstract model storage operations"""
    def get_model_config(self, model_id: str) -> Optional[Dict[str, Any]]:
        """Retrieve model configuration by ID"""
        ...
    
    def save_prediction(self, prediction_data: Dict[str, Any]) -> str:
        """Store prediction results and return identifier"""
        ...

class ModelLoaderInterface(Protocol):
    """Abstract model loading operations"""
    def load_model(self, model_path: str) -> Any:
        """Load model from specified path"""
        ...
    
    def predict(self, model: Any, features: List[float]) -> List[float]:
        """Generate prediction using model"""
        ...

# Service Layer - Pure logic
class PredictionEngineService:
    def __init__(self, repository: ModelRepositoryInterface, loader: ModelLoaderInterface):
        # Dependencies on abstractions establish formal boundary
        self.repository = repository
        self.loader = loader
    
    def generate_prediction(self, features: List[float], model_id: str) -> Dict[str, Any]:
        # Business logic independent of infrastructure details
        model_config = self.repository.get_model_config(model_id)
        if not model_config:
            raise ValueError(f"Unknown model: {model_id}")
        
        # Domain operations use abstract interfaces
        model = self.loader.load_model(model_config["path"])
        prediction = self.loader.predict(model, features)
        
        # Persistence through abstract contract
        prediction_id = self.repository.save_prediction({
            "model_id": model_id,
            "features": features,
            "result": prediction,
            "timestamp": datetime.now()
        })
        
        return {"prediction_id": prediction_id, "result": prediction}

This architectural creates two key properties for the system:

The immediate practical benefits include:

Refactoring Strategy

The challenge I faced was implementing these principles on a live codebase. Here are some of my notes of what worked:

Defining Domain Interfaces

The first step is defining clean interfaces that represent infrastructure requirements. This layer defines what the application needs from external systems without specifying how those needs are fulfilled.

As a first approximation, it simply means taking your existing core module and dividing the infrastructural concerns and its core functionality. The infrastructural concerns need to move out, the rest can stay.

This approach creates a strong contract— a reliable set of operations that must be satisfied by any concrete implementation.

class ISimulator(ABC):
    """Port interface for Simulation Services"""
	
	@abstractmethod
    async def run_simulation(self, experiment_id: str, parameters: dict) -> dict:
        ... # Define the methods the domain needs for simulation

Creating Adapters for Different Environments

For each port, we implement multiple adapters tailored to different environments. For the refactor, we first focused on simple adaptors that can run locally. This enabled us to validate the system-level implementations without worrying about the other details.

class PrefectSimulationRunnerService(ISimulator):
    """Adapter for running simulations using Prefect"""
    def __init__(self):
        self.simulator = Simulator() # Core simulation logic

    async def run_simulation(self, experiment_id: str, parameters: dict) -> dict:
        # Implementation for production environment
        ...

class LocalSimulationRunnerService(ISimulator):
    """Adapter for running simulations locally"""
    # Implementation for local environment
    ...

class MockSimulationRunnerService(ISimulator):
    """Mock Adapter for testing purposes"""
    # Implementation for testing environment
    ...

Application Services: Orchestrating Use Cases

The Application Layer orchestrates the use cases of our application by coordinating domain entities and services. It's responsible for translating between domain and external formats and managing workflows. The key difference is that Application Services depend only on abstract interfaces, not concrete implementations.

# In backend/application/simulation_service.py
class APIServices:
    def __init__(
        self,
        atlas_service: IAtlas,
        simulation_service: ISimulator,
        notification_service: INotification
    ):
        self.atlas_service = atlas_service
        self.simulation_service = simulation_service
        self.notification_service = notification_service
    
    async def run_experiment(self, experiment_id: str, parameters: dict):
        # 1. Get necessary data from the Atlas service
        atlas_data = await self.atlas_service.get_atlas_data(experiment_id)
        
        # 2. Combine with parameters and run simulation
        enriched_parameters = {**parameters, "atlas_data": atlas_data}
        simulation_result = await self.simulation_service.run_simulation(
            experiment_id, 
            enriched_parameters
        )
        
        # 3. Notify relevant stakeholders
        await self.notification_service.notify(
            "experiment.completed",
            {
                "experiment_id": experiment_id,
                "status": simulation_result.get("status", "unknown"),
                "summary": simulation_result.get("summary", {})
            }
        )
        
        return {
            "experiment_id": experiment_id,
            "result": simulation_result,
            "status": "completed"
        }

Dynamic Adapter Selection

One of the nice aspects of the Ports & Adapters architecture is the ability to switch between different implementations based on the environment. The added compexity of the interface now becomes its strength and actually simplifies configurations.

The system dynamically selects appropriate adapters based on the environment with a simple profiler factory:

class Environment(Enum):
    DEVELOPMENT = "development"
    STAGING = "staging"
    PRODUCTION = "production"
    TESTING = "testing"
    LOCAL = "local"

class ServiceImplementations:
    """Maps service names to implementation classes for each environment"""
    IMPLEMENTATIONS = {
        Environment.PRODUCTION.value: {
            "simulation": "backend.infrastructure.adapters.simulation_adapters.PrefectSimulationRunnerService",
            "atlas": "backend.infrastructure.adapters.atlas_adapters.AtlasDBService",
            "notification": "backend.infrastructure.adapters.notification_adapters.SlackNotificationService",
        },
        Environment.DEVELOPMENT.value: {
            "simulation": "backend.infrastructure.adapters.simulation_adapters.LocalSimulationRunnerService",
            "atlas": "backend.infrastructure.adapters.atlas_adapters.AtlasFileService",
            "notification": "backend.infrastructure.adapters.notification_adapters.ConsoleNotificationService",
        },
    ...
    }

# Service instance cache
_service_instances: Dict[str, Any] = {}

def get_service_implementation(service_name: str) -> Any:
    """Factory function to get appropriate service implementation based on environment"""
    # Return cached instance if available
    if service_name in _service_instances:
        return _service_instances[service_name]
    
    # Determine current environment
    environment = os.getenv("ENVIRONMENT", Environment.DEVELOPMENT.value)
    
    # Get implementation class path
    implementations = ServiceImplementations.IMPLEMENTATIONS.get(
        environment, 
        ServiceImplementations.IMPLEMENTATIONS[Environment.DEVELOPMENT.value]
    )
    
    implementation_path = implementations.get(service_name)
    if not implementation_path:
        raise ValueError(f"No implementation defined for service '{service_name}' in environment '{environment}'")
    
    # Import and instantiate implementation
    module_path, class_name = implementation_path.rsplit(".", 1)
    module = importlib.import_module(module_path)
    service_class = getattr(module, class_name)
    
    # Cache and return instance
    _service_instances[service_name] = service_class()
    return _service_instances[service_name]

This environment-aware factory approach gaveus tremendous flexibility.

End Notes

So. The implementation of Ports and Adapters architecture was delivered with some intense keyboard action, but several immediate benefits immediately emerged such as...

This felt good.

Though powerful, this architecture wasn't without costs:

Overall, am I glad we did this? 100% Yes.

The beauty of this is how it created a system for growth. Instead of brute-force free-for-alls, this system enabled engineers to collaborate through systems instead of endless video calls and documents.

"No, we DON'T have time to get on a quick call..."


Deep Dives

Ensuring Data Integrity Through Type-Safe Abstractions

One critical challenge in ML systems is ensuring consistent data representation across processing stages. Particularly in performance-sensitive domains where incorrect transformations can lead to costly errors, I needed abstractions that provide both safety and performance. The approach I advocated for applies functional programming principles to dataframe operations while maintaining vectorized performance.

The Problem: Performance & Typesafety

Working with tabular data introduces a fundamental tension between:

Solution: Type-Safe Schema Definition with Runtime Verification

Our approach uses formal data types to model schema constraints, eschewing any 3rd party implementations.

Instead, I relied on simple Enums and ABCs:

class SimulationColumn(Enum):
    """Enum defining simulation columns with their names and types"""
    TIME = ("time", float)
    CONCENTRATION = ("concentration", float)
    TEMPERATURE = ("temperature", float)
    PRESSURE = ("pressure", float)
    
    @property
    def name(self) -> str:
        """Get column name"""
        return self.value[0]
    
    @property
    def dtype(self) -> type:
        """Get column data type"""
        return self.value[1]
    
    @classmethod
    def all_columns(cls) -> List[str]:
        """Get all column names"""
        return [col.name for col in cls]
    
    @classmethod
    def schema_items(cls) -> List[Tuple[str, type]]:
        """Get all column name-type pairs"""
        return [col.value for col in cls]

# Abstract interface establishing contract for dataframes
class SimulationDataFrameInterface(ABC):
    """Abstract interface for dataframes containing simulation data"""
    
    @abstractmethod
    def has_required_columns(self) -> bool:
        """Check if dataframe has all required columns"""
        pass
    
    @abstractmethod
    def validate_schema(self) -> bool:
        """Validate dataframe schema against requirements"""
        pass
    
    @abstractmethod
    def get_data(self) -> pd.DataFrame:
        """Return the underlying DataFrame"""
        pass


class EnhancedSimulationDataFrame(SimulationDataFrameInterface):
    """DataFrame implementation that conforms to the simulation interface"""
    
    def __init__(self, data: pd.DataFrame):
        """Initialize with a pandas DataFrame"""
        self._data = data
        self._validate_on_init()
        
    # ...validation implementation details...
    
    def get_data(self) -> pd.DataFrame:
        """Return the underlying DataFrame"""
        return self._data

# Domain function that uses the interface
def analyze_simulation_results(df: SimulationDataFrameInterface) -> dict:
    """Analyze simulation results with guaranteed schema compliance"""
    data = df.get_data()
    
    # Safe to access columns using enum properties
    avg_conc = data[SimulationColumn.CONCENTRATION.name].mean()
    max_temp = data[SimulationColumn.TEMPERATURE.name].max()
    
    return {
        "average_concentration": avg_conc,
        "maximum_temperature": max_temp,
        "data_points": len(data)
    }

This approach provides several quantifiable benefits:

The performance overhead of validation is O(k) where k is the number of columns, which is typically negligible compared to the O(m×n) operations performed on dataframes where m is the row count. For our production system, this validation added overhead while eliminating an entire class of runtime errors.

For downstream engineers who used this patter, it gave several nice guarantees:

Finally, this approach extends naturally to ML feature pipelines and model inputs, where type and shape consistency are critical. By applying the same pattern to feature transformations, we could ensure that features conform to the expected distribution and type, model inputs are guaranteed to match training schema and prediction outputs maintained consistent structure. It was a very nice benefit that did not required any additional work.

For our quantitative platform, this provided crucial guarantees about the validity of numerical operations throughout the processing pipeline, reducing model risk without sacrificing computational performance.

Future-proofing: Event Driven Architecture Migration Plan

The planning of this made it evident that the Ports & Adapters architecture provides an excellent foundation for transitioning to event-driven design.

By defining event-related interfaces in our domain, we abstract away the details of how events are published and consumed, allowing us to implement different event mechanisms in different environments.

At the service layer, we can define two kinds of services: api_services and event_services. Eventually, api_services will add items to the event queue: a simple but elegent transition for an active codebase!

Here's an example for complex operations like model training, we can extend the pattern to support event-driven workflows to enable a Asynchronous Processing Pattern:

# In backend/domain/ports/event_port.py
class IEventService(Protocol):
    """Port interface for event publishing and subscription"""
    async def publish(self, event_type: str, payload: dict) -> None:
        """Publish an event with the given type and payload"""
        ...
    
    async def subscribe(self, event_type: str, handler: Callable) -> Any:
        """Subscribe to events of the given type"""
        ...

Some Concrete Implementations

# In backend/infrastructure/adapters/event_adapters.py
class KafkaEventService(EventServiceProtocol):
    """Production-ready Kafka implementation of the event service"""
    def __init__(self, bootstrap_servers):
        from kafka import KafkaProducer, KafkaConsumer
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.bootstrap_servers = bootstrap_servers
        self.consumers = {}
    
    async def publish(self, event_type: str, payload: dict) -> None:
        """Publish event to Kafka topic"""
        self.producer.send(event_type, payload)
    
    async def subscribe(self, event_type: str, handler: callable) -> Any:
        """Subscribe to Kafka topic"""
        from kafka import KafkaConsumer
        import threading
        
        consumer = KafkaConsumer(
            event_type,
            bootstrap_servers=self.bootstrap_servers,
            value_deserializer=lambda v: json.loads(v.decode('utf-8'))
        )
        
        def consumer_thread():
            for message in consumer:
                handler(message.value)
        
        thread = threading.Thread(target=consumer_thread, daemon=True)
        thread.start()
        
        self.consumers[event_type] = (consumer, thread)
        return thread

class InMemoryEventService(EventServiceProtocol):
    """Simple in-memory implementation for development and testing"""
    def __init__(self):
        self.subscribers = {}
    
    async def publish(self, event_type: str, payload: dict) -> None:
        """Publish event to in-memory subscribers"""
        if event_type in self.subscribers:
            for handler in self.subscribers[event_type]:
                await handler(payload)
    
    async def subscribe(self, event_type: str, handler: callable) -> Any:
        """Subscribe to in-memory events"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        
        self.subscribers[event_type].append(handler)
        return len(self.subscribers[event_type]) - 1

This approach allows us to use messaging infrastructure in production while keeping development and testing simple with in-memory event handling. The business logic remains unchanged, as it depends only on the abstract EventServiceProtocol.

class MLFlowDaedelusService(IDaedelusService):
    def __init__(self, event_bus: IBusinessEventService):
        self.event_bus = event_bus
        
    def train_model_async(self, model_config: ModelConfig):
        # Start asynchronous training task
        background_task = run_long_training(model_config)
        
        # Publish event on completion
        def on_training_complete(results):
            self.event_bus.publish(
                "model_training_completed",
                {"model_id": model_config.model_id, "results": results}
            )
            
        background_task.add_done_callback(on_training_complete)
        return background_task

This pattern creates a composition where outputs from one operation become inputs to another, independent of timing constraints.

... or at least thats what it does in my head....