from django.contrib import admin from .models import PipelineStep, PipelineRun, PipelineProductRun class PipelineStepAdmin(admin.ModelAdmin): list_display = ('name', 'description') # Specify the fields to display class PipelineProductRunAdmin(admin.ModelAdmin): list_display = ('pipeline_run', 'step', 'status', 'date') list_filter = ('status', 'step') # Add filtering by status and step search_fields = ('pipeline_run__run_identifier', 'step__name') # Enable search by pipeline_run identifier and step name # Register your models with custom ModelAdmin admin.site.register(PipelineStep, PipelineStepAdmin) admin.site.register(PipelineProductRun, PipelineProductRunAdmin) # pxy_de/admin.py (or wherever your PipelineRun admin is) from django.contrib import messages from pxy_de.pipelines.pxy_products.run_products import run_pipeline def run_pipeline_action(modeladmin, request, queryset): try: run_pipeline() messages.success(request, "Pipeline executed successfully.") except Exception as e: messages.error(request, f"Pipeline failed to execute: {str(e)}") run_pipeline_action.short_description = "Run Products Pipeline" class PipelineRunAdmin(admin.ModelAdmin): list_display = ['run_identifier', 'overall_status', 'date'] ordering = ['-date'] actions = [run_pipeline_action] admin.site.register(PipelineRun, PipelineRunAdmin)