-
Notifications
You must be signed in to change notification settings - Fork 3
Plugin Development
LearnMCP-xAPI uses a modular plugin architecture that makes it easy to add support for new Learning Record Stores (LRS). This guide will walk you through creating your own LRS plugin, from understanding the architecture to implementing and testing a complete plugin.
The plugin system is designed around a simple interface that standardizes how LearnMCP-xAPI communicates with different Learning Record Stores while allowing each plugin to handle the specific requirements of its target LRS.
- Plugin Interface: Abstract base class defining the contract all plugins must implement
- Plugin Manager: Handles plugin discovery, loading, and lifecycle management
- Configuration System: Unified configuration management with plugin-specific settings
- Error Handling: Standardized error handling and retry logic across all plugins
Each plugin consists of:
- Plugin Class: Main implementation inheriting from the base plugin interface
- Configuration Schema: Defines required and optional configuration parameters
- Authentication Handler: Manages LRS-specific authentication methods
- Statement Processor: Handles xAPI statement formatting and submission
- Error Handler: Plugin-specific error handling and recovery logic
Create your plugin in the learnmcp_xapi/plugins/
directory:
learnmcp_xapi/plugins/
├── __init__.py
├── base.py # Base plugin interface
├── lrsql.py # LRS SQL plugin
├── ralph.py # Ralph LRS plugin
├── veracity.py # Veracity LRS plugin
└── your_lrs.py # Your new plugin
Start with this template for your new plugin:
"""
Your LRS Plugin for LearnMCP-xAPI
Implements integration with Your Learning Record Store
"""
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any
from urllib.parse import urljoin
import aiohttp
from aiohttp import ClientSession, ClientTimeout
from .base import BaseLRSPlugin
class YourLRSPlugin(BaseLRSPlugin):
"""
Plugin for Your Learning Record Store
Supports:
- Basic Authentication
- OAuth 2.0 (if applicable)
- Custom authentication methods
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the Your LRS plugin
Args:
config: Plugin configuration dictionary
"""
super().__init__(config)
self.logger = logging.getLogger(__name__)
# Extract configuration
self.endpoint = config.get('endpoint')
self.username = config.get('username')
self.password = config.get('password')
self.api_key = config.get('api_key')
self.timeout = config.get('timeout', 30)
self.retry_attempts = config.get('retry_attempts', 3)
self.verify_ssl = config.get('verify_ssl', True)
# Validate required configuration
self._validate_config()
# Initialize session
self.session: Optional[ClientSession] = None
def _validate_config(self) -> None:
"""Validate plugin configuration"""
if not self.endpoint:
raise ValueError("Your LRS endpoint is required")
if not (self.username and self.password) and not self.api_key:
raise ValueError("Either username/password or API key is required")
async def initialize(self) -> None:
"""Initialize the plugin and establish connection"""
timeout = ClientTimeout(total=self.timeout)
connector = aiohttp.TCPConnector(verify_ssl=self.verify_ssl)
self.session = ClientSession(
timeout=timeout,
connector=connector,
headers=self._get_default_headers()
)
# Test connection
await self._test_connection()
self.logger.info(f"Successfully connected to Your LRS at {self.endpoint}")
async def cleanup(self) -> None:
"""Cleanup resources"""
if self.session:
await self.session.close()
def _get_default_headers(self) -> Dict[str, str]:
"""Get default headers for requests"""
headers = {
'Content-Type': 'application/json',
'X-Experience-API-Version': '1.0.3',
'User-Agent': 'LearnMCP-xAPI/1.0'
}
# Add authentication headers
if self.api_key:
headers['Authorization'] = f'Bearer {self.api_key}'
return headers
async def _test_connection(self) -> None:
"""Test connection to the LRS"""
try:
about_url = urljoin(self.endpoint, '/xapi/about')
auth = None
if self.username and self.password:
auth = aiohttp.BasicAuth(self.username, self.password)
async with self.session.get(about_url, auth=auth) as response:
if response.status == 200:
about_data = await response.json()
self.logger.debug(f"LRS About: {about_data}")
else:
raise Exception(f"Connection test failed: {response.status}")
except Exception as e:
self.logger.error(f"Failed to connect to Your LRS: {e}")
raise
async def send_statement(self, statement: Dict[str, Any]) -> Dict[str, Any]:
"""
Send an xAPI statement to the LRS
Args:
statement: xAPI statement dictionary
Returns:
Response from the LRS
"""
if not self.session:
raise RuntimeError("Plugin not initialized")
statements_url = urljoin(self.endpoint, '/xapi/statements')
# Add statement ID if not present
if 'id' not in statement:
import uuid
statement['id'] = str(uuid.uuid4())
auth = None
if self.username and self.password:
auth = aiohttp.BasicAuth(self.username, self.password)
for attempt in range(self.retry_attempts):
try:
async with self.session.post(
statements_url,
json=statement,
auth=auth
) as response:
if response.status in [200, 204]:
result = {
'status': 'success',
'statement_id': statement.get('id'),
'lrs_response': await response.text()
}
self.logger.debug(f"Statement sent successfully: {statement.get('id')}")
return result
else:
error_text = await response.text()
raise Exception(f"LRS error {response.status}: {error_text}")
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt == self.retry_attempts - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def get_statements(self, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""
Retrieve statements from the LRS
Args:
params: Query parameters for filtering statements
Returns:
List of xAPI statements
"""
if not self.session:
raise RuntimeError("Plugin not initialized")
statements_url = urljoin(self.endpoint, '/xapi/statements')
auth = None
if self.username and self.password:
auth = aiohttp.BasicAuth(self.username, self.password)
try:
async with self.session.get(
statements_url,
params=params or {},
auth=auth
) as response:
if response.status == 200:
data = await response.json()
statements = data.get('statements', [])
self.logger.debug(f"Retrieved {len(statements)} statements")
return statements
else:
error_text = await response.text()
raise Exception(f"Failed to retrieve statements: {response.status} - {error_text}")
except Exception as e:
self.logger.error(f"Error retrieving statements: {e}")
raise
def get_configuration_schema(self) -> Dict[str, Any]:
"""
Return the configuration schema for this plugin
Returns:
Dictionary describing required and optional configuration parameters
"""
return {
'required': [
'endpoint'
],
'optional': [
'username',
'password',
'api_key',
'timeout',
'retry_attempts',
'verify_ssl'
],
'authentication_methods': [
'basic_auth', # username + password
'api_key' # Bearer token
],
'description': 'Plugin for Your Learning Record Store',
'example_config': {
'endpoint': 'https://your-lrs.example.com',
'username': 'your-username',
'password': 'your-password',
'timeout': 30,
'retry_attempts': 3,
'verify_ssl': True
}
}
# Plugin registration
def create_plugin(config: Dict[str, Any]) -> YourLRSPlugin:
"""Factory function to create plugin instance"""
return YourLRSPlugin(config)
All plugins must inherit from BaseLRSPlugin
. Here's the interface your plugin must implement:
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Any
class BaseLRSPlugin(ABC):
"""Base class for all LRS plugins"""
def __init__(self, config: Dict[str, Any]):
self.config = config
@abstractmethod
async def initialize(self) -> None:
"""Initialize the plugin and establish connection"""
pass
@abstractmethod
async def cleanup(self) -> None:
"""Cleanup resources when plugin is no longer needed"""
pass
@abstractmethod
async def send_statement(self, statement: Dict[str, Any]) -> Dict[str, Any]:
"""Send an xAPI statement to the LRS"""
pass
@abstractmethod
async def get_statements(self, params: Optional[Dict[str, Any]] = None) -> List[Dict[str, Any]]:
"""Retrieve statements from the LRS"""
pass
@abstractmethod
def get_configuration_schema(self) -> Dict[str, Any]:
"""Return configuration schema for this plugin"""
pass
Your plugin should follow the naming convention for environment variables:
# Plugin Selection
LRS_PLUGIN=your_lrs
# Your LRS Configuration
YOUR_LRS_ENDPOINT=https://your-lrs.example.com
YOUR_LRS_USERNAME=username
YOUR_LRS_PASSWORD=password
YOUR_LRS_API_KEY=api-key
YOUR_LRS_TIMEOUT=30
YOUR_LRS_RETRY_ATTEMPTS=3
YOUR_LRS_VERIFY_SSL=true
Create a configuration file template in config/plugins/your_lrs.yaml
:
# Your LRS Plugin Configuration
endpoint: ${YOUR_LRS_ENDPOINT:-https://your-lrs.example.com}
username: ${YOUR_LRS_USERNAME}
password: ${YOUR_LRS_PASSWORD}
api_key: ${YOUR_LRS_API_KEY}
timeout: ${YOUR_LRS_TIMEOUT:-30}
retry_attempts: ${YOUR_LRS_RETRY_ATTEMPTS:-3}
verify_ssl: ${YOUR_LRS_VERIFY_SSL:-true}
# Plugin-specific settings
custom_setting: ${YOUR_LRS_CUSTOM_SETTING:-default_value}
batch_size: ${YOUR_LRS_BATCH_SIZE:-100}
async def _authenticate_basic(self) -> aiohttp.BasicAuth:
"""Create basic authentication object"""
return aiohttp.BasicAuth(self.username, self.password)
async def _authenticate_oauth(self) -> str:
"""Obtain OAuth 2.0 access token"""
token_url = urljoin(self.endpoint, '/oauth/token')
data = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret
}
async with self.session.post(token_url, data=data) as response:
if response.status == 200:
token_data = await response.json()
return token_data['access_token']
else:
raise Exception(f"OAuth authentication failed: {response.status}")
async def _authenticate_custom(self) -> Dict[str, str]:
"""Implement custom authentication method"""
# Your custom authentication logic here
auth_headers = {
'X-API-Key': self.api_key,
'X-Client-ID': self.client_id
}
return auth_headers
Implement robust error handling with appropriate retry logic:
async def _execute_with_retry(self, operation, *args, **kwargs):
"""Execute operation with retry logic"""
last_exception = None
for attempt in range(self.retry_attempts):
try:
return await operation(*args, **kwargs)
except aiohttp.ClientError as e:
last_exception = e
wait_time = min(2 ** attempt, 30) # Exponential backoff with max 30s
self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s")
await asyncio.sleep(wait_time)
except Exception as e:
# Don't retry on non-network errors
self.logger.error(f"Non-retryable error: {e}")
raise
raise last_exception
For high-volume scenarios, implement batch processing:
async def send_statements_batch(self, statements: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Send multiple statements in a batch"""
batch_url = urljoin(self.endpoint, '/xapi/statements')
# Split into chunks if necessary
batch_size = self.config.get('batch_size', 100)
results = []
for i in range(0, len(statements), batch_size):
batch = statements[i:i + batch_size]
async with self.session.post(batch_url, json=batch) as response:
if response.status in [200, 204]:
batch_result = await response.json()
results.extend(batch_result)
else:
raise Exception(f"Batch failed: {response.status}")
return results
Create unit tests in tests/test_your_lrs_plugin.py
:
import pytest
import asyncio
from unittest.mock import AsyncMock, patch
from learnmcp_xapi.plugins.your_lrs import YourLRSPlugin
class TestYourLRSPlugin:
@pytest.fixture
def plugin_config(self):
return {
'endpoint': 'https://test-lrs.example.com',
'username': 'test_user',
'password': 'test_pass',
'timeout': 30,
'retry_attempts': 3
}
@pytest.fixture
def plugin(self, plugin_config):
return YourLRSPlugin(plugin_config)
def test_plugin_initialization(self, plugin):
assert plugin.endpoint == 'https://test-lrs.example.com'
assert plugin.username == 'test_user'
assert plugin.timeout == 30
def test_config_validation(self):
with pytest.raises(ValueError):
YourLRSPlugin({}) # Missing required config
@pytest.mark.asyncio
async def test_send_statement(self, plugin):
# Mock the HTTP session
with patch.object(plugin, 'session') as mock_session:
mock_response = AsyncMock()
mock_response.status = 200
mock_response.text.return_value = '{"status": "ok"}'
mock_session.post.return_value.__aenter__.return_value = mock_response
statement = {
'actor': {'name': 'Test User'},
'verb': {'id': 'http://adlnet.gov/expapi/verbs/experienced'},
'object': {'id': 'http://example.com/test'}
}
result = await plugin.send_statement(statement)
assert result['status'] == 'success'
Create integration tests that work with a real LRS instance:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_real_lrs_integration():
"""Test with actual LRS instance"""
config = {
'endpoint': os.getenv('TEST_LRS_ENDPOINT'),
'username': os.getenv('TEST_LRS_USERNAME'),
'password': os.getenv('TEST_LRS_PASSWORD')
}
if not all(config.values()):
pytest.skip("Integration test credentials not provided")
plugin = YourLRSPlugin(config)
await plugin.initialize()
try:
# Test statement sending
statement = create_test_statement()
result = await plugin.send_statement(statement)
assert result['status'] == 'success'
# Test statement retrieval
statements = await plugin.get_statements()
assert len(statements) > 0
finally:
await plugin.cleanup()
Add your plugin to the plugin registry in learnmcp_xapi/plugins/__init__.py
:
from .your_lrs import YourLRSPlugin
AVAILABLE_PLUGINS = {
'lrsql': 'learnmcp_xapi.plugins.lrsql',
'ralph': 'learnmcp_xapi.plugins.ralph',
'veracity': 'learnmcp_xapi.plugins.veracity',
'your_lrs': 'learnmcp_xapi.plugins.your_lrs' # Add your plugin
}
def create_plugin(plugin_name: str, config: dict):
"""Factory function to create plugin instances"""
if plugin_name == 'your_lrs':
return YourLRSPlugin(config)
# ... other plugins
Update the main README.md to include your plugin in the supported LRS list.
- Never log sensitive information like passwords or API keys
- Use secure defaults (SSL verification enabled, reasonable timeouts)
- Validate all inputs and sanitize configuration values
- Handle authentication errors gracefully without exposing credentials
- Implement connection pooling for high-volume scenarios
- Use async/await properly to avoid blocking operations
- Implement exponential backoff for retry logic
- Add batch processing for multiple statements
- Provide meaningful error messages that help users diagnose issues
- Distinguish between retryable and non-retryable errors
- Log appropriate levels (debug for verbose info, error for failures)
- Handle network timeouts and connection issues
- Use environment variables with sensible defaults
- Provide clear configuration documentation
- Validate configuration on plugin initialization
- Support both environment variables and configuration files
Once your plugin is complete and tested:
- Fork the LearnMCP-xAPI repository
-
Create a feature branch:
git checkout -b feature/your-lrs-plugin
- Add your plugin files and tests
- Update documentation (README, wiki pages)
-
Run the test suite:
pytest
- Submit a pull request with a clear description
- Plugin follows the established interface
- Comprehensive unit tests included
- Integration tests provided (if possible)
- Documentation updated
- Configuration examples provided
- Error handling implemented
- Security best practices followed
- Performance considerations addressed
Study the existing plugins for implementation patterns:
-
LRS SQL Plugin (
lrsql.py
): Simple HTTP API with basic auth -
Ralph LRS Plugin (
ralph.py
): Multiple auth methods (Basic + OIDC) -
Veracity Plugin (
veracity.py
): Enterprise features and multi-tenant support
Most xAPI-compliant LRS systems follow similar patterns:
-
About Endpoint:
/xapi/about
- LRS information -
Statements Endpoint:
/xapi/statements
- Send/retrieve statements -
Activities Endpoint:
/xapi/activities
- Activity definitions -
Agents Endpoint:
/xapi/agents
- Actor information
By following this guide and examining the existing plugins, you'll be able to create robust, maintainable plugins that extend LearnMCP-xAPI's capabilities to support any xAPI-compliant Learning Record Store.