Skip to content

Plugin Development

David edited this page Jun 2, 2025 · 3 revisions

LearnMCP-xAPI uses a modular plugin architecture that makes it easy to add support for new Learning Record Stores (LRS). This guide will walk you through creating your own LRS plugin, from understanding the architecture to implementing and testing a complete plugin.

Plugin Architecture Overview

The plugin system is designed around a simple interface that standardizes how LearnMCP-xAPI communicates with different Learning Record Stores while allowing each plugin to handle the specific requirements of its target LRS.

Core Components

  • Plugin Interface: Abstract base class defining the contract all plugins must implement
  • Plugin Manager: Handles plugin discovery, loading, and lifecycle management
  • Configuration System: Unified configuration management with plugin-specific settings
  • Error Handling: Standardized error handling and retry logic across all plugins

Plugin Structure

Each plugin consists of:

  • Plugin Class: Main implementation inheriting from the base plugin interface
  • Configuration Schema: Defines required and optional configuration parameters
  • Authentication Handler: Manages LRS-specific authentication methods
  • Statement Processor: Handles xAPI statement formatting and submission
  • Error Handler: Plugin-specific error handling and recovery logic

Creating a New Plugin

Step 1: Plugin Directory Structure

Create your plugin in the learnmcp_xapi/plugins/ directory:

learnmcp_xapi/plugins/
├── __init__.py
├── base.py                 # Base plugin interface
├── lrsql.py               # LRS SQL plugin
├── ralph.py               # Ralph LRS plugin
├── veracity.py            # Veracity LRS plugin
└── your_lrs.py            # Your new plugin

Step 2: Basic Plugin Template

Start with this template for your new plugin:

"""
Your LRS Plugin for LearnMCP-xAPI
Implements integration with Your Learning Record Store
"""

import asyncio
import json
import logging
from typing import Dict, List, Optional, Any
from urllib.parse import urljoin

import aiohttp
from aiohttp import ClientSession, ClientTimeout

from .base import BaseLRSPlugin


class YourLRSPlugin(BaseLRSPlugin):
    """
    Plugin for Your Learning Record Store
    
    Supports:
    - Basic Authentication
    - OAuth 2.0 (if applicable)
    - Custom authentication methods
    """
    
    def __init__(self, config: Dict[str, Any]):
        """
        Initialize the Your LRS plugin
        
        Args:
            config: Plugin configuration dictionary
        """
        super().__init__(config)
        self.logger = logging.getLogger(__name__)
        
        # Extract configuration
        self.endpoint = config.get('endpoint')
        self.username = config.get('username')
        self.password = config.get('password')
        self.api_key = config.get('api_key')
        self.timeout = config.get('timeout', 30)
        self.retry_attempts = config.get('retry_attempts', 3)
        self.verify_ssl = config.get('verify_ssl', True)
        
        # Validate required configuration
        self._validate_config()
        
        # Initialize session
        self.session: Optional[ClientSession] = None
        
    def _validate_config(self) -> None:
        """Validate plugin configuration"""
        if not self.endpoint:
            raise ValueError("Your LRS endpoint is required")
            
        if not (self.username and self.password) and not self.api_key:
            raise ValueError("Either username/password or API key is required")
    
    async def initialize(self) -> None:
        """Initialize the plugin and establish connection"""
        timeout = ClientTimeout(total=self.timeout)
        connector = aiohttp.TCPConnector(verify_ssl=self.verify_ssl)
        
        self.session = ClientSession(
            timeout=timeout,
            connector=connector,
            headers=self._get_default_headers()
        )
        
        # Test connection
        await self._test_connection()
        self.logger.info(f"Successfully connected to Your LRS at {self.endpoint}")
    
    async def cleanup(self) -> None:
        """Cleanup resources"""
        if self.session:
            await self.session.close()
    
    def _get_default_headers(self) -> Dict[str, str]:
        """Get default headers for requests"""
        headers = {
            'Content-Type': 'application/json',
            'X-Experience-API-Version': '1.0.3',
            'User-Agent': 'LearnMCP-xAPI/1.0'
        }
        
        # Add authentication headers
        if self.api_key:
            headers['Authorization'] = f'Bearer {self.api_key}'
        
        return headers
    
    async def _test_connection(self) -> None:
        """Test connection to the LRS"""
        try:
            about_url = urljoin(self.endpoint, '/xapi/about')
            
            auth = None
            if self.username and self.password:
                auth = aiohttp.BasicAuth(self.username, self.password)
            
            async with self.session.get(about_url, auth=auth) as response:
                if response.status == 200:
                    about_data = await response.json()
                    self.logger.debug(f"LRS About: {about_data}")
                else:
                    raise Exception(f"Connection test failed: {response.status}")
                    
        except Exception as e:
            self.logger.error(f"Failed to connect to Your LRS: {e}")
            raise
    
    async def send_statement(self, statement: Dict[str, Any]) -> Dict[str, Any]:
        """
        Send an xAPI statement to the LRS
        
        Args:
            statement: xAPI statement dictionary
            
        Returns:
            Response from the LRS
        """
        if not self.session:
            raise RuntimeError("Plugin not initialized")
        
        statements_url = urljoin(self.endpoint, '/xapi/statements')
        
        # Add statement ID if not present
        if 'id' not in statement:
            import uuid
            statement['id'] = str(uuid.uuid4())
        
        auth = None
        if self.username and self.password:
            auth = aiohttp.BasicAuth(self.username, self.password)
        
        for attempt in range(self.retry_attempts):
            try:
                async with self.session.post(
                    statements_url,
                    json=statement,
                    auth=auth
                ) as response:
                    
                    if response.status in [200, 204]:
                        result = {
                            'status': 'success',
                            'statement_id': statement.get('id'),
                            'lrs_response': await response.text()
                        }
                        self.logger.debug(f"Statement sent successfully: {statement.get('id')}")
                        return result
                    else:
                        error_text = await response.text()
                        raise Exception(f"LRS error {response.status}: {error_text}")
                        
            except Exception as e:
                self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
                if attempt == self.retry_attempts - 1:
                    raise
                await asyncio.sleep(2 ** attempt)  # Exponential backoff
    
    async def get_statements(self, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        """
        Retrieve statements from the LRS
        
        Args:
            params: Query parameters for filtering statements
            
        Returns:
            List of xAPI statements
        """
        if not self.session:
            raise RuntimeError("Plugin not initialized")
        
        statements_url = urljoin(self.endpoint, '/xapi/statements')
        
        auth = None
        if self.username and self.password:
            auth = aiohttp.BasicAuth(self.username, self.password)
        
        try:
            async with self.session.get(
                statements_url,
                params=params or {},
                auth=auth
            ) as response:
                
                if response.status == 200:
                    data = await response.json()
                    statements = data.get('statements', [])
                    self.logger.debug(f"Retrieved {len(statements)} statements")
                    return statements
                else:
                    error_text = await response.text()
                    raise Exception(f"Failed to retrieve statements: {response.status} - {error_text}")
                    
        except Exception as e:
            self.logger.error(f"Error retrieving statements: {e}")
            raise
    
    def get_configuration_schema(self) -> Dict[str, Any]:
        """
        Return the configuration schema for this plugin
        
        Returns:
            Dictionary describing required and optional configuration parameters
        """
        return {
            'required': [
                'endpoint'
            ],
            'optional': [
                'username',
                'password', 
                'api_key',
                'timeout',
                'retry_attempts',
                'verify_ssl'
            ],
            'authentication_methods': [
                'basic_auth',  # username + password
                'api_key'      # Bearer token
            ],
            'description': 'Plugin for Your Learning Record Store',
            'example_config': {
                'endpoint': 'https://your-lrs.example.com',
                'username': 'your-username',
                'password': 'your-password',
                'timeout': 30,
                'retry_attempts': 3,
                'verify_ssl': True
            }
        }


# Plugin registration
def create_plugin(config: Dict[str, Any]) -> YourLRSPlugin:
    """Factory function to create plugin instance"""
    return YourLRSPlugin(config)

Step 3: Base Plugin Interface

All plugins must inherit from BaseLRSPlugin. Here's the interface your plugin must implement:

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any


class BaseLRSPlugin(ABC):
    """Base class for all LRS plugins"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
    
    @abstractmethod
    async def initialize(self) -> None:
        """Initialize the plugin and establish connection"""
        pass
    
    @abstractmethod
    async def cleanup(self) -> None:
        """Cleanup resources when plugin is no longer needed"""
        pass
    
    @abstractmethod
    async def send_statement(self, statement: Dict[str, Any]) -> Dict[str, Any]:
        """Send an xAPI statement to the LRS"""
        pass
    
    @abstractmethod
    async def get_statements(self, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
        """Retrieve statements from the LRS"""
        pass
    
    @abstractmethod
    def get_configuration_schema(self) -> Dict[str, Any]:
        """Return configuration schema for this plugin"""
        pass

Configuration Management

Environment Variables

Your plugin should follow the naming convention for environment variables:

# Plugin Selection
LRS_PLUGIN=your_lrs

# Your LRS Configuration
YOUR_LRS_ENDPOINT=https://your-lrs.example.com
YOUR_LRS_USERNAME=username
YOUR_LRS_PASSWORD=password
YOUR_LRS_API_KEY=api-key
YOUR_LRS_TIMEOUT=30
YOUR_LRS_RETRY_ATTEMPTS=3
YOUR_LRS_VERIFY_SSL=true

Configuration File Support

Create a configuration file template in config/plugins/your_lrs.yaml:

# Your LRS Plugin Configuration
endpoint: ${YOUR_LRS_ENDPOINT:-https://your-lrs.example.com}
username: ${YOUR_LRS_USERNAME}
password: ${YOUR_LRS_PASSWORD}
api_key: ${YOUR_LRS_API_KEY}
timeout: ${YOUR_LRS_TIMEOUT:-30}
retry_attempts: ${YOUR_LRS_RETRY_ATTEMPTS:-3}
verify_ssl: ${YOUR_LRS_VERIFY_SSL:-true}

# Plugin-specific settings
custom_setting: ${YOUR_LRS_CUSTOM_SETTING:-default_value}
batch_size: ${YOUR_LRS_BATCH_SIZE:-100}

Advanced Features

Authentication Methods

Basic Authentication

async def _authenticate_basic(self) -> aiohttp.BasicAuth:
    """Create basic authentication object"""
    return aiohttp.BasicAuth(self.username, self.password)

OAuth 2.0

async def _authenticate_oauth(self) -> str:
    """Obtain OAuth 2.0 access token"""
    token_url = urljoin(self.endpoint, '/oauth/token')
    
    data = {
        'grant_type': 'client_credentials',
        'client_id': self.client_id,
        'client_secret': self.client_secret
    }
    
    async with self.session.post(token_url, data=data) as response:
        if response.status == 200:
            token_data = await response.json()
            return token_data['access_token']
        else:
            raise Exception(f"OAuth authentication failed: {response.status}")

Custom Authentication

async def _authenticate_custom(self) -> Dict[str, str]:
    """Implement custom authentication method"""
    # Your custom authentication logic here
    auth_headers = {
        'X-API-Key': self.api_key,
        'X-Client-ID': self.client_id
    }
    return auth_headers

Error Handling

Implement robust error handling with appropriate retry logic:

async def _execute_with_retry(self, operation, *args, **kwargs):
    """Execute operation with retry logic"""
    last_exception = None
    
    for attempt in range(self.retry_attempts):
        try:
            return await operation(*args, **kwargs)
        except aiohttp.ClientError as e:
            last_exception = e
            wait_time = min(2 ** attempt, 30)  # Exponential backoff with max 30s
            self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s")
            await asyncio.sleep(wait_time)
        except Exception as e:
            # Don't retry on non-network errors
            self.logger.error(f"Non-retryable error: {e}")
            raise
    
    raise last_exception

Batch Processing

For high-volume scenarios, implement batch processing:

async def send_statements_batch(self, statements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """Send multiple statements in a batch"""
    batch_url = urljoin(self.endpoint, '/xapi/statements')
    
    # Split into chunks if necessary
    batch_size = self.config.get('batch_size', 100)
    results = []
    
    for i in range(0, len(statements), batch_size):
        batch = statements[i:i + batch_size]
        
        async with self.session.post(batch_url, json=batch) as response:
            if response.status in [200, 204]:
                batch_result = await response.json()
                results.extend(batch_result)
            else:
                raise Exception(f"Batch failed: {response.status}")
    
    return results

Testing Your Plugin

Unit Tests

Create unit tests in tests/test_your_lrs_plugin.py:

import pytest
import asyncio
from unittest.mock import AsyncMock, patch

from learnmcp_xapi.plugins.your_lrs import YourLRSPlugin


class TestYourLRSPlugin:
    
    @pytest.fixture
    def plugin_config(self):
        return {
            'endpoint': 'https://test-lrs.example.com',
            'username': 'test_user',
            'password': 'test_pass',
            'timeout': 30,
            'retry_attempts': 3
        }
    
    @pytest.fixture
    def plugin(self, plugin_config):
        return YourLRSPlugin(plugin_config)
    
    def test_plugin_initialization(self, plugin):
        assert plugin.endpoint == 'https://test-lrs.example.com'
        assert plugin.username == 'test_user'
        assert plugin.timeout == 30
    
    def test_config_validation(self):
        with pytest.raises(ValueError):
            YourLRSPlugin({})  # Missing required config
    
    @pytest.mark.asyncio
    async def test_send_statement(self, plugin):
        # Mock the HTTP session
        with patch.object(plugin, 'session') as mock_session:
            mock_response = AsyncMock()
            mock_response.status = 200
            mock_response.text.return_value = '{"status": "ok"}'
            mock_session.post.return_value.__aenter__.return_value = mock_response
            
            statement = {
                'actor': {'name': 'Test User'},
                'verb': {'id': 'http://adlnet.gov/expapi/verbs/experienced'},
                'object': {'id': 'http://example.com/test'}
            }
            
            result = await plugin.send_statement(statement)
            assert result['status'] == 'success'

Integration Tests

Create integration tests that work with a real LRS instance:

@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_lrs_integration():
    """Test with actual LRS instance"""
    config = {
        'endpoint': os.getenv('TEST_LRS_ENDPOINT'),
        'username': os.getenv('TEST_LRS_USERNAME'),
        'password': os.getenv('TEST_LRS_PASSWORD')
    }
    
    if not all(config.values()):
        pytest.skip("Integration test credentials not provided")
    
    plugin = YourLRSPlugin(config)
    await plugin.initialize()
    
    try:
        # Test statement sending
        statement = create_test_statement()
        result = await plugin.send_statement(statement)
        assert result['status'] == 'success'
        
        # Test statement retrieval
        statements = await plugin.get_statements()
        assert len(statements) > 0
        
    finally:
        await plugin.cleanup()

Plugin Registration

Update Plugin Manager

Add your plugin to the plugin registry in learnmcp_xapi/plugins/__init__.py:

from .your_lrs import YourLRSPlugin

AVAILABLE_PLUGINS = {
    'lrsql': 'learnmcp_xapi.plugins.lrsql',
    'ralph': 'learnmcp_xapi.plugins.ralph', 
    'veracity': 'learnmcp_xapi.plugins.veracity',
    'your_lrs': 'learnmcp_xapi.plugins.your_lrs'  # Add your plugin
}

def create_plugin(plugin_name: str, config: dict):
    """Factory function to create plugin instances"""
    if plugin_name == 'your_lrs':
        return YourLRSPlugin(config)
    # ... other plugins

Documentation

Update the main README.md to include your plugin in the supported LRS list.

Best Practices

Security

  • Never log sensitive information like passwords or API keys
  • Use secure defaults (SSL verification enabled, reasonable timeouts)
  • Validate all inputs and sanitize configuration values
  • Handle authentication errors gracefully without exposing credentials

Performance

  • Implement connection pooling for high-volume scenarios
  • Use async/await properly to avoid blocking operations
  • Implement exponential backoff for retry logic
  • Add batch processing for multiple statements

Error Handling

  • Provide meaningful error messages that help users diagnose issues
  • Distinguish between retryable and non-retryable errors
  • Log appropriate levels (debug for verbose info, error for failures)
  • Handle network timeouts and connection issues

Configuration

  • Use environment variables with sensible defaults
  • Provide clear configuration documentation
  • Validate configuration on plugin initialization
  • Support both environment variables and configuration files

Contributing Your Plugin

Once your plugin is complete and tested:

  1. Fork the LearnMCP-xAPI repository
  2. Create a feature branch: git checkout -b feature/your-lrs-plugin
  3. Add your plugin files and tests
  4. Update documentation (README, wiki pages)
  5. Run the test suite: pytest
  6. Submit a pull request with a clear description

Pull Request Checklist

  • Plugin follows the established interface
  • Comprehensive unit tests included
  • Integration tests provided (if possible)
  • Documentation updated
  • Configuration examples provided
  • Error handling implemented
  • Security best practices followed
  • Performance considerations addressed

Examples and References

Existing Plugin Examples

Study the existing plugins for implementation patterns:

  • LRS SQL Plugin (lrsql.py): Simple HTTP API with basic auth
  • Ralph LRS Plugin (ralph.py): Multiple auth methods (Basic + OIDC)
  • Veracity Plugin (veracity.py): Enterprise features and multi-tenant support

Common LRS Patterns

Most xAPI-compliant LRS systems follow similar patterns:

  • About Endpoint: /xapi/about - LRS information
  • Statements Endpoint: /xapi/statements - Send/retrieve statements
  • Activities Endpoint: /xapi/activities - Activity definitions
  • Agents Endpoint: /xapi/agents - Actor information

By following this guide and examining the existing plugins, you'll be able to create robust, maintainable plugins that extend LearnMCP-xAPI's capabilities to support any xAPI-compliant Learning Record Store.