37 lines
1.4 KiB
Python

from django.contrib import admin
from .models import PipelineStep, PipelineRun, PipelineProductRun
class PipelineStepAdmin(admin.ModelAdmin):
list_display = ('name', 'description') # Specify the fields to display
class PipelineProductRunAdmin(admin.ModelAdmin):
list_display = ('pipeline_run', 'step', 'status', 'date')
list_filter = ('status', 'step') # Add filtering by status and step
search_fields = ('pipeline_run__run_identifier', 'step__name') # Enable search by pipeline_run identifier and step name
# Register your models with custom ModelAdmin
admin.site.register(PipelineStep, PipelineStepAdmin)
admin.site.register(PipelineProductRun, PipelineProductRunAdmin)
# pxy_de/admin.py (or wherever your PipelineRun admin is)
from django.contrib import messages
from pxy_de.pipelines.pxy_products.run_products import run_pipeline
def run_pipeline_action(modeladmin, request, queryset):
try:
run_pipeline()
messages.success(request, "Pipeline executed successfully.")
except Exception as e:
messages.error(request, f"Pipeline failed to execute: {str(e)}")
run_pipeline_action.short_description = "Run Products Pipeline"
class PipelineRunAdmin(admin.ModelAdmin):
list_display = ['run_identifier', 'overall_status', 'date']
ordering = ['-date']
actions = [run_pipeline_action]
admin.site.register(PipelineRun, PipelineRunAdmin)