Elasticsearch Source
This Elasticsearch source connector provides comprehensive state management capabilities to track processing progress and enable fault-tolerant data ingestion.
Features
- Incremental Data Processing: Track last processed timestamp to avoid reprocessing data
- Cursor-based Pagination: Support for document ID-based cursors
- Scroll-based Pagination: Support for Elasticsearch scroll API
- Error Tracking: Monitor error counts and last error messages
- Processing Statistics: Track performance metrics and processing times
- Persistent State Storage: Multiple storage backends (file, Elasticsearch, Redis)
- Auto-save: Configurable automatic state persistence
- State Recovery: Resume processing from last known position after restart
Configuration
Basic Configuration
type = "source"
key = "elasticsearch"
enabled = true
version = 0
name = "Elasticsearch source"
path = "target/release/libiggy_connector_elasticsearch_source"
[[streams]]
stream = "elasticsearch_stream"
topic = "documents"
schema = "json"
batch_length = 100
linger_time = "5ms"
[plugin_config]
url = "http://localhost:9200"
index = "logs-*"
polling_interval = "30s"
batch_size = 100
timestamp_field = "@timestamp"
query = {
"match_all": {}
}
State Management Configuration
[plugin_config]
# ... basic config ...
state = {
enabled = true
storage_type = "file" # "file", "elasticsearch", "redis"
storage_config = {
base_path = "./connector_states" # for file storage
# index = "connector_states" # for elasticsearch storage
# url = "redis://localhost:6379" # for redis storage
}
state_id = "elasticsearch_logs_connector"
auto_save_interval = "5m"
tracked_fields = [
"last_poll_timestamp",
"last_document_id",
"total_documents_fetched"
]
}
State Information
The connector tracks the following state information:
Processing State
last_poll_timestamp: Last successful poll timestamptotal_documents_fetched: Total number of documents processedpoll_count: Number of polling cycles executedlast_document_id: Last processed document ID (for cursor pagination)last_scroll_id: Last scroll ID (for scroll pagination)last_offset: Last processed offset
Error Tracking
error_count: Total number of errors encounteredlast_error: Last error message
Performance Statistics
total_bytes_processed: Total bytes processedavg_batch_processing_time_ms: Average processing time per batchlast_successful_poll: Timestamp of last successful pollempty_polls_count: Number of polls that returned no documentssuccessful_polls_count: Number of successful polls
Storage Backends
File Storage (Default)
state = {
enabled = true
storage_type = "file"
storage_config = {
base_path = "./connector_states"
}
}
Elasticsearch Storage
state = {
enabled = true
storage_type = "elasticsearch"
storage_config = {
index = "connector_states"
url = "http://localhost:9200"
}
}
Redis Storage
state = {
enabled = true
storage_type = "redis"
storage_config = {
url = "redis://localhost:6379"
key_prefix = "connector_states:"
}
}
Usage Examples
Basic Usage with State Management
use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
// Create connector with state management enabled
let mut connector = ElasticsearchSource::new(id, config);
// Open connector (automatically loads state if available)
connector.open().await?;
// Start polling (automatically saves state)
let messages = connector.poll().await?;
// Close connector (automatically saves final state)
connector.close().await?;
Manual State Management
use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
let mut connector = ElasticsearchSource::new(id, config);
// Load state manually
connector.load_state().await?;
// Get current state
let state = connector.get_state().await?;
println!("Current state: {:?}", state);
// Export state to JSON
let state_json = connector.export_state().await?;
println!("State JSON: {}", serde_json::to_string_pretty(&state_json)?);
// Import state from JSON
connector.import_state(state_json).await?;
// Reset state
connector.reset_state().await?;
State Manager Utilities
use elasticsearch_source::{ElasticsearchSource, StateManagerExt};
let connector = ElasticsearchSource::new(id, config);
// Get state manager
if let Some(state_manager) = connector.get_state_manager() {
// Get state statistics
let stats = state_manager.get_state_stats().await?;
println!("Total states: {}", stats.total_states);
// Clean up old states (older than 30 days)
let deleted_count = state_manager.cleanup_old_states(30).await?;
println!("Deleted {} old states", deleted_count);
}
State File Format
State files are stored as JSON with the following structure:
{
"id": "elasticsearch_logs_connector",
"last_updated": "2024-01-15T10:30:00Z",
"version": 1,
"data": {
"last_poll_timestamp": "2024-01-15T10:30:00Z",
"total_documents_fetched": 15000,
"poll_count": 150,
"last_document_id": "doc_12345",
"last_scroll_id": "scroll_abc123",
"last_offset": 15000,
"error_count": 2,
"last_error": "Connection timeout",
"processing_stats": {
"total_bytes_processed": 1048576,
"avg_batch_processing_time_ms": 125.5,
"last_successful_poll": "2024-01-15T10:30:00Z",
"empty_polls_count": 5,
"successful_polls_count": 145
}
},
"metadata": {
"connector_type": "elasticsearch_source",
"connector_id": 1,
"index": "logs-*",
"url": "http://localhost:9200"
}
}
Best Practices
- State ID Uniqueness: Use unique state IDs for different connector instances
- Auto-save Interval: Set appropriate auto-save intervals based on your data volume
- Storage Location: Use persistent storage locations for production deployments
- State Cleanup: Regularly clean up old state files to prevent disk space issues
- Error Handling: Monitor error counts and implement appropriate alerting
- Backup: Regularly backup state files for disaster recovery
Troubleshooting
Common Issues
- State Not Loading: Check file permissions and storage path
- State Corruption: Delete corrupted state files to start fresh
- Performance Issues: Adjust auto-save interval and batch sizes
- Storage Full: Implement state cleanup policies
Monitoring
Monitor the following metrics:
- State save/load success rates
- Processing statistics
- Error counts and types
- Storage usage for state files
Migration
To migrate from a connector without state management:
- Add state configuration to your connector config
- Set
enabled = truein state config - Restart the connector
- The connector will start tracking state from the next poll cycle
To migrate between storage backends:
- Export state from current storage
- Update storage configuration
- Import state to new storage
- Restart connector