Job Management API Documentation
Job Management API Documentation
Feature: F1.3 Flink Streaming - Job Management Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P1 Implementation Timeline: Week 2 (Nov 11-17, 2025)
Executive Summary
The Job Management API provides complete control over F1.3 Flink Streaming job lifecycle: submission, monitoring, cancellation, savepoint management, and recovery. The API supports both HTTP REST and native Rust interfaces, with Prometheus/Grafana integration for monitoring.
Key Features
- Job submission with validation
- Job lifecycle management (submit, run, pause, cancel, restart)
- Savepoint management (create, restore, delete)
- Real-time monitoring (metrics, status, logs)
- Resource management (CPU, memory, parallelism)
- Failure recovery (automatic restart, checkpointing)
API Overview
Endpoints
| Method | Endpoint | Description |
|---|---|---|
POST | /api/v1/jobs | Submit new job |
GET | /api/v1/jobs | List all jobs |
GET | /api/v1/jobs/{job_id} | Get job details |
DELETE | /api/v1/jobs/{job_id} | Cancel job |
POST | /api/v1/jobs/{job_id}/savepoints | Create savepoint |
GET | /api/v1/jobs/{job_id}/metrics | Get job metrics |
GET | /api/v1/jobs/{job_id}/logs | Get job logs |
POST | /api/v1/jobs/{job_id}/restart | Restart job |
API Reference
1. Submit Job
Submit a new streaming job.
Endpoint: POST /api/v1/jobs
Request Body:
{ "name": "fraud_detection_pipeline", "config": { "parallelism": 4, "checkpoint_interval_ms": 60000, "state_backend": "rocksdb", "restart_strategy": "fixed_delay" }, "sources": [ { "type": "kafka", "config": { "bootstrap_servers": ["localhost:9092"], "topic": "transactions", "group_id": "fraud_detector" } } ], "transformations": [ { "type": "filter", "config": { "condition": "amount > 10000" } }, { "type": "cep", "config": { "pattern": "A B+ C", "window": "1h" } } ], "sinks": [ { "type": "database", "config": { "connection_string": "postgresql://...", "table": "fraud_alerts" } } ]}Response (201 Created):
{ "job_id": "job-123e4567-e89b-12d3-a456-426614174000", "status": "submitted", "submitted_at": "2025-10-29T10:00:00Z", "message": "Job submitted successfully"}Rust API:
let job_config = JobConfig { name: "fraud_detection_pipeline".to_string(), parallelism: 4, checkpoint_interval: Duration::from_secs(60), state_backend: StateBackendType::RocksDb, restart_strategy: RestartStrategy::FixedDelay { attempts: 3, delay: Duration::from_secs(10), },};
let job_id = job_manager.submit_job(job_config).await?;2. List Jobs
Get list of all jobs.
Endpoint: GET /api/v1/jobs
Query Parameters:
status(optional): Filter by status (running,finished,failed,cancelled)limit(optional): Max results (default: 100)offset(optional): Pagination offset
Response (200 OK):
{ "jobs": [ { "job_id": "job-123...", "name": "fraud_detection_pipeline", "status": "running", "submitted_at": "2025-10-29T10:00:00Z", "started_at": "2025-10-29T10:00:05Z", "uptime_seconds": 3600, "parallelism": 4 }, { "job_id": "job-456...", "name": "recommendation_engine", "status": "finished", "submitted_at": "2025-10-29T08:00:00Z", "started_at": "2025-10-29T08:00:03Z", "finished_at": "2025-10-29T09:30:00Z", "parallelism": 8 } ], "total": 2, "limit": 100, "offset": 0}Rust API:
let jobs = job_manager.list_jobs(ListJobsOptions { status: Some(JobStatus::Running), limit: 100, offset: 0,}).await?;3. Get Job Details
Get detailed information about a specific job.
Endpoint: GET /api/v1/jobs/{job_id}
Response (200 OK):
{ "job_id": "job-123...", "name": "fraud_detection_pipeline", "status": "running", "config": { "parallelism": 4, "checkpoint_interval_ms": 60000, "state_backend": "rocksdb", "restart_strategy": "fixed_delay" }, "submitted_at": "2025-10-29T10:00:00Z", "started_at": "2025-10-29T10:00:05Z", "uptime_seconds": 3600, "metrics": { "events_processed": 1523400, "throughput_per_sec": 423, "latency_p50_ms": 3.2, "latency_p99_ms": 12.5, "checkpoint_count": 60, "last_checkpoint_duration_ms": 45, "backpressure_events": 0, "restarts": 0 }, "tasks": [ { "task_id": "task-1", "name": "kafka-source", "status": "running", "parallelism": 1 }, { "task_id": "task-2", "name": "filter", "status": "running", "parallelism": 4 }, { "task_id": "task-3", "name": "cep-matcher", "status": "running", "parallelism": 4 }, { "task_id": "task-4", "name": "database-sink", "status": "running", "parallelism": 2 } ], "checkpoints": [ { "checkpoint_id": "checkpoint-1", "timestamp": "2025-10-29T10:59:00Z", "duration_ms": 45, "size_bytes": 1048576, "status": "completed" } ]}Rust API:
let job_details = job_manager.get_job_details(job_id).await?;println!("Job status: {:?}", job_details.status);println!("Events processed: {}", job_details.metrics.events_processed);4. Cancel Job
Cancel a running or paused job.
Endpoint: DELETE /api/v1/jobs/{job_id}
Query Parameters:
savepoint(optional): Create savepoint before cancelling (true/false, default:false)
Response (200 OK):
{ "job_id": "job-123...", "status": "cancelled", "cancelled_at": "2025-10-29T11:00:00Z", "savepoint_id": "savepoint-789..." // if savepoint=true}Rust API:
// Cancel without savepointjob_manager.cancel_job(job_id).await?;
// Cancel with savepointlet savepoint_id = job_manager.cancel_job_with_savepoint(job_id).await?;5. Create Savepoint
Create a savepoint for a running job (for backup or migration).
Endpoint: POST /api/v1/jobs/{job_id}/savepoints
Request Body (optional):
{ "description": "Pre-upgrade savepoint"}Response (201 Created):
{ "savepoint_id": "savepoint-789...", "job_id": "job-123...", "created_at": "2025-10-29T11:00:00Z", "size_bytes": 10485760, "location": "s3://checkpoints/savepoint-789...", "description": "Pre-upgrade savepoint"}Rust API:
let savepoint_id = job_manager.create_savepoint( job_id, Some("Pre-upgrade savepoint".to_string())).await?;6. Get Job Metrics
Get real-time metrics for a job.
Endpoint: GET /api/v1/jobs/{job_id}/metrics
Query Parameters:
window(optional): Time window (1m,5m,1h,24h, default:5m)
Response (200 OK):
{ "job_id": "job-123...", "timestamp": "2025-10-29T11:00:00Z", "metrics": { "events_processed_total": 1523400, "throughput_per_sec": 423, "latency": { "p50_ms": 3.2, "p95_ms": 8.7, "p99_ms": 12.5, "p999_ms": 25.3 }, "checkpoint": { "count": 60, "last_duration_ms": 45, "failures": 0 }, "backpressure": { "events_total": 0, "current_level": 0.0 }, "resources": { "cpu_usage_percent": 45.2, "memory_used_mb": 512, "memory_total_mb": 2048 }, "errors": { "count": 0, "rate_per_min": 0.0 } }, "history": [ { "timestamp": "2025-10-29T10:55:00Z", "throughput_per_sec": 420 }, { "timestamp": "2025-10-29T10:56:00Z", "throughput_per_sec": 425 } ]}Rust API:
let metrics = job_manager.get_job_metrics( job_id, MetricsWindow::Minutes(5)).await?;
println!("Throughput: {} events/sec", metrics.throughput_per_sec);println!("Latency p99: {}ms", metrics.latency.p99_ms);7. Get Job Logs
Get logs for a job.
Endpoint: GET /api/v1/jobs/{job_id}/logs
Query Parameters:
level(optional): Filter by log level (debug,info,warn,error)limit(optional): Max log lines (default: 1000)tail(optional): Return last N lines (default: false)
Response (200 OK):
{ "job_id": "job-123...", "logs": [ { "timestamp": "2025-10-29T10:00:05.123Z", "level": "info", "message": "Job started successfully", "task_id": null }, { "timestamp": "2025-10-29T10:00:05.456Z", "level": "info", "message": "Kafka source connected to localhost:9092", "task_id": "task-1" }, { "timestamp": "2025-10-29T10:01:00.789Z", "level": "info", "message": "Checkpoint 1 completed (45ms)", "task_id": null }, { "timestamp": "2025-10-29T10:05:12.345Z", "level": "warn", "message": "Temporary backpressure detected", "task_id": "task-2" } ], "total": 4, "limit": 1000}Rust API:
let logs = job_manager.get_job_logs( job_id, LogOptions { level: Some(LogLevel::Warn), limit: 1000, tail: true, }).await?;
for log in logs { println!("[{}] {}: {}", log.timestamp, log.level, log.message);}8. Restart Job
Restart a failed or cancelled job (optionally from savepoint).
Endpoint: POST /api/v1/jobs/{job_id}/restart
Request Body (optional):
{ "savepoint_id": "savepoint-789...", // Optional "config_overrides": { // Optional "parallelism": 8 }}Response (200 OK):
{ "job_id": "job-123...", "status": "running", "restarted_at": "2025-10-29T11:05:00Z", "restored_from_savepoint": "savepoint-789...", "message": "Job restarted successfully"}Rust API:
// Restart from latest checkpointjob_manager.restart_job(job_id, None).await?;
// Restart from specific savepointjob_manager.restart_job( job_id, Some(RestartOptions { savepoint_id: Some(savepoint_id), config_overrides: None, })).await?;๐ Rust API (Native)
JobManager
pub struct JobManager { job_registry: Arc<RwLock<HashMap<JobId, JobMetadata>>>, scheduler: Arc<Scheduler>, checkpoint_coordinator: Arc<CheckpointCoordinator>, metrics_collector: Arc<MetricsCollector>,}
impl JobManager { /// Create new job manager pub fn new(max_parallelism: usize) -> Result<Self>;
/// Submit a new job pub async fn submit_job(&self, config: JobConfig) -> Result<JobId>;
/// Cancel a running job pub async fn cancel_job(&self, job_id: JobId) -> Result<()>;
/// Cancel job with savepoint pub async fn cancel_job_with_savepoint(&self, job_id: JobId) -> Result<SavepointId>;
/// Create savepoint pub async fn create_savepoint( &self, job_id: JobId, description: Option<String> ) -> Result<SavepointId>;
/// Restart job pub async fn restart_job( &self, job_id: JobId, options: Option<RestartOptions> ) -> Result<()>;
/// Get job status pub async fn get_job_status(&self, job_id: JobId) -> Result<JobStatus>;
/// Get job details pub async fn get_job_details(&self, job_id: JobId) -> Result<JobDetails>;
/// Get job metrics pub async fn get_job_metrics( &self, job_id: JobId, window: MetricsWindow ) -> Result<JobMetrics>;
/// Get job logs pub async fn get_job_logs( &self, job_id: JobId, options: LogOptions ) -> Result<Vec<LogEntry>>;
/// List all jobs pub async fn list_jobs(&self, options: ListJobsOptions) -> Result<Vec<JobSummary>>;}Data Structures
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct JobConfig { pub name: String, pub parallelism: usize, pub checkpoint_interval: Duration, pub state_backend: StateBackendType, pub restart_strategy: RestartStrategy, pub sources: Vec<SourceConfig>, pub transformations: Vec<TransformationConfig>, pub sinks: Vec<SinkConfig>,}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]pub enum JobStatus { Submitted, Running, Paused, Finished, Failed, Cancelled,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct JobDetails { pub job_id: JobId, pub name: String, pub status: JobStatus, pub config: JobConfig, pub submitted_at: SystemTime, pub started_at: Option<SystemTime>, pub finished_at: Option<SystemTime>, pub uptime_secs: u64, pub metrics: JobMetrics, pub tasks: Vec<TaskInfo>, pub checkpoints: Vec<CheckpointInfo>,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct JobMetrics { pub events_processed: u64, pub throughput_per_sec: f64, pub latency_p50_ms: f64, pub latency_p99_ms: f64, pub checkpoint_count: u64, pub last_checkpoint_duration_ms: u64, pub backpressure_events: u64, pub restarts: u32, pub cpu_usage_percent: f64, pub memory_used_mb: u64,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub enum RestartStrategy { /// No automatic restart None, /// Restart with fixed delay FixedDelay { attempts: u32, delay: Duration, }, /// Restart with exponential backoff ExponentialBackoff { max_attempts: u32, initial_delay: Duration, max_delay: Duration, },}Prometheus Metrics
Job Metrics
heliosdb_job_count(gauge): Number of active jobsheliosdb_job_status(gauge): Job status by job_idheliosdb_job_events_processed_total(counter): Total events processedheliosdb_job_throughput(gauge): Current throughput (events/sec)heliosdb_job_latency_seconds(histogram): Processing latencyheliosdb_job_checkpoint_duration_seconds(histogram): Checkpoint durationheliosdb_job_checkpoint_failures_total(counter): Checkpoint failuresheliosdb_job_backpressure_events_total(counter): Backpressure eventsheliosdb_job_restarts_total(counter): Job restartsheliosdb_job_errors_total(counter): Job errors
Examples
Example 1: Submit and Monitor Job
use heliosdb_streaming::job::*;
#[tokio::main]async fn main() -> Result<()> { // Create job manager let job_manager = JobManager::new(16)?;
// Configure job let config = JobConfig { name: "realtime_analytics".to_string(), parallelism: 4, checkpoint_interval: Duration::from_secs(60), state_backend: StateBackendType::RocksDb, restart_strategy: RestartStrategy::FixedDelay { attempts: 3, delay: Duration::from_secs(10), }, sources: vec![ SourceConfig::kafka(...), ], transformations: vec![ TransformationConfig::filter(...), TransformationConfig::window(...), ], sinks: vec![ SinkConfig::database(...), ], };
// Submit job let job_id = job_manager.submit_job(config).await?; println!("Job submitted: {}", job_id);
// Monitor job loop { let metrics = job_manager.get_job_metrics(job_id, MetricsWindow::Minutes(1)).await?; println!("Throughput: {} events/sec", metrics.throughput_per_sec); println!("Latency p99: {}ms", metrics.latency_p99_ms);
tokio::time::sleep(Duration::from_secs(5)).await; }}Example 2: Savepoint and Restart
// Create savepoint before maintenancelet savepoint_id = job_manager.create_savepoint( job_id, Some("Pre-maintenance savepoint".to_string())).await?;
// Cancel jobjob_manager.cancel_job(job_id).await?;
// Perform maintenance...
// Restart job from savepointjob_manager.restart_job( job_id, Some(RestartOptions { savepoint_id: Some(savepoint_id), config_overrides: Some(JobConfigOverrides { parallelism: Some(8), // Increase parallelism }), })).await?;๐งช Testing
Integration Tests (10 tests)
- Submit job and verify status
- Cancel job
- Create savepoint
- Restart from savepoint
- Get job metrics
- Get job logs
- List jobs with filters
- Job failure and automatic restart
- Concurrent job management
- Resource cleanup after job completion
Acceptance Criteria
- All 10 integration tests passing
- HTTP API endpoints functional
- Rust API documented
- Prometheus metrics exported
- Grafana dashboards created
- Performance: API latency <100ms
- Documentation complete
Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Implementation: Week 2 (Nov 11-17, 2025)