11package main
22
33import (
4+ "encoding/json"
45 "fmt"
56 "net/http"
67 "os"
78 "strings"
9+ "sync"
810 "time"
911
12+ notificationapi "github.com/argoproj/notifications-engine/pkg/api"
13+ notificationcontroller "github.com/argoproj/notifications-engine/pkg/controller"
1014 "github.com/argoproj/pkg/kubeclientmetrics"
15+ "github.com/go-logr/logr"
1116 smiclientset "github.com/servicemeshinterface/smi-sdk-go/pkg/gen/client/split/clientset/versioned"
1217 log "github.com/sirupsen/logrus"
1318 "github.com/spf13/cobra"
1419 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
20+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
21+ k8sruntime "k8s.io/apimachinery/pkg/runtime"
22+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1523 "k8s.io/client-go/discovery"
1624 "k8s.io/client-go/dynamic"
1725 "k8s.io/client-go/dynamic/dynamicinformer"
1826 kubeinformers "k8s.io/client-go/informers"
1927 "k8s.io/client-go/kubernetes"
28+ clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2029 _ "k8s.io/client-go/plugin/pkg/client/auth/azure"
2130 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
2231 _ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
2332 "k8s.io/client-go/tools/clientcmd"
33+ ctrl "sigs.k8s.io/controller-runtime"
34+ "sigs.k8s.io/controller-runtime/pkg/cache"
35+ ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
36+ metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
2437
2538 "github.com/argoproj/argo-rollouts/metricproviders"
39+ "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
2640 "github.com/argoproj/argo-rollouts/rollout"
41+ "github.com/argoproj/argo-rollouts/rolloutplugin"
42+ statefulset "github.com/argoproj/argo-rollouts/rolloutplugin/plugins/statefulset"
2743 "github.com/argoproj/argo-rollouts/utils/errors"
2844 "github.com/argoproj/argo-rollouts/utils/record"
2945
@@ -47,10 +63,28 @@ const (
4763 jsonFormat = "json"
4864 textFormat = "text"
4965
50- controllerAnalysis = "analysis"
66+ controllerAnalysis = "analysis"
67+ controllerRolloutPlugin = "rolloutplugin"
68+ listenAddr = "0.0.0.0:%d"
5169)
5270
53- var supportedControllers = map [string ]bool {controllerAnalysis : true }
71+ var supportedControllers = map [string ]bool {
72+ controllerAnalysis : true ,
73+ controllerRolloutPlugin : true ,
74+ }
75+
76+ var (
77+ scheme = k8sruntime .NewScheme ()
78+ )
79+
80+ func init () {
81+ // Set controller-runtime logger to a null logger to suppress the warning
82+ // We use logrus for our own logging
83+ ctrl .SetLogger (logr .New (ctrllog.NullLogSink {}))
84+
85+ utilruntime .Must (clientgoscheme .AddToScheme (scheme ))
86+ utilruntime .Must (v1alpha1 .AddToScheme (scheme ))
87+ }
5488
5589func newCommand () * cobra.Command {
5690 var (
@@ -225,11 +259,24 @@ func newCommand() *cobra.Command {
225259 go func () { log .Println (http .ListenAndServe (pprofAddress , mux )) }()
226260 }
227261
228- var cm * controller.Manager
229-
230262 enabledControllers , err := getEnabledControllers (controllersEnabled )
231263 errors .CheckError (err )
232264
265+ // Create shared MetricsServer that will be used by all controllers
266+ log .Info ("Creating shared metrics server" )
267+ metricsServer := metrics .NewMetricsServer (metrics.ServerConfig {
268+ Addr : fmt .Sprintf (listenAddr , metricsPort ),
269+ RolloutLister : tolerantinformer .NewTolerantRolloutInformer (dynamicInformerFactory ).Lister (),
270+ AnalysisRunLister : tolerantinformer .NewTolerantAnalysisRunInformer (dynamicInformerFactory ).Lister (),
271+ AnalysisTemplateLister : tolerantinformer .NewTolerantAnalysisTemplateInformer (dynamicInformerFactory ).Lister (),
272+ ClusterAnalysisTemplateLister : tolerantinformer .NewTolerantClusterAnalysisTemplateInformer (clusterDynamicInformerFactory ).Lister (),
273+ ExperimentLister : tolerantinformer .NewTolerantExperimentInformer (dynamicInformerFactory ).Lister (),
274+ RolloutPluginLister : tolerantinformer .NewTolerantRolloutPluginInformer (dynamicInformerFactory ).Lister (),
275+ K8SRequestProvider : k8sRequestProvider ,
276+ })
277+
278+ var cm * controller.Manager
279+
233280 // currently only supports running analysis controller independently
234281 if enabledControllers [controllerAnalysis ] {
235282 log .Info ("Running only analysis controller" )
@@ -242,9 +289,8 @@ func newCommand() *cobra.Command {
242289 tolerantinformer .NewTolerantAnalysisTemplateInformer (dynamicInformerFactory ),
243290 tolerantinformer .NewTolerantClusterAnalysisTemplateInformer (clusterDynamicInformerFactory ),
244291 resyncDuration ,
245- metricsPort ,
292+ metricsServer ,
246293 healthzPort ,
247- k8sRequestProvider ,
248294 dynamicInformerFactory ,
249295 clusterDynamicInformerFactory ,
250296 namespaced ,
@@ -274,9 +320,8 @@ func newCommand() *cobra.Command {
274320 notificationSecretInformerFactory ,
275321 resyncDuration ,
276322 instanceID ,
277- metricsPort ,
323+ metricsServer ,
278324 healthzPort ,
279- k8sRequestProvider ,
280325 nginxIngressClasses ,
281326 albIngressClasses ,
282327 dynamicInformerFactory ,
@@ -288,9 +333,168 @@ func newCommand() *cobra.Command {
288333 ephemeralMetadataThreads ,
289334 ephemeralMetadataPodRetries )
290335 }
291- if err = cm .Run (ctx , rolloutThreads , serviceThreads , ingressThreads , experimentThreads , analysisThreads , electOpts ); err != nil {
292- log .Fatalf ("Error running controller: %s" , err .Error ())
336+
337+ // Setup RolloutPlugin Controller if enabled (uses controller-runtime)
338+ setupRolloutPlugin := enabledControllers [controllerRolloutPlugin ]
339+
340+ if setupRolloutPlugin {
341+ log .Info ("Setting up RolloutPlugin controller" )
342+
343+ // Determine leader election namespace
344+ leaderElectionNamespace := electOpts .LeaderElectionNamespace
345+ if leaderElectionNamespace == "" {
346+ if namespaced && namespace != metav1 .NamespaceAll {
347+ leaderElectionNamespace = namespace
348+ } else {
349+ leaderElectionNamespace = defaults .Namespace ()
350+ }
351+ // Update electOpts with the computed namespace for standard controllers
352+ electOpts .LeaderElectionNamespace = leaderElectionNamespace
353+ }
354+
355+ mgrOpts := ctrl.Options {
356+ Scheme : scheme ,
357+ Metrics : metricsserver.Options {
358+ BindAddress : "0" , // Disable metrics server, use standard controller's metrics
359+ },
360+ HealthProbeBindAddress : "0" , // Disable health probe, use standard controller's healthz
361+ LeaderElection : false , // Disable leader election, use standard controller's leader election
362+ LeaderElectionID : controller .GetLeaderElectionLeaseLockName (),
363+ LeaderElectionNamespace : leaderElectionNamespace ,
364+ }
365+ if namespaced && namespace != metav1 .NamespaceAll {
366+ log .WithField ("namespace" , namespace ).Info ("RolloutPlugin controller running in namespaced mode" )
367+ mgrOpts .Cache = cache.Options {
368+ DefaultNamespaces : map [string ]cache.Config {
369+ namespace : {},
370+ },
371+ }
372+ } else {
373+ log .Info ("RolloutPlugin controller running in cluster-scoped mode" )
374+ }
375+
376+ mgr , err := ctrl .NewManager (config , mgrOpts )
377+ if err != nil {
378+ log .Fatalf ("Failed to create controller-runtime manager: %s" , err .Error ())
379+ }
380+
381+ // Get the singleton plugin manager instance
382+ pluginManager := rolloutplugin .GetGlobalPluginManager ()
383+
384+ logrusCtx := log .WithField ("plugin" , "statefulset" )
385+ // Initialize built-in plugins
386+ statefulSetPlugin := statefulset .NewPlugin (logrusCtx )
387+
388+ if err := pluginManager .RegisterPlugin ("statefulset" , statefulSetPlugin , namespace ); err != nil {
389+ log .Fatalf ("Failed to register statefulset plugin: %s" , err .Error ())
390+ }
391+ log .Info ("Registered StatefulSet plugin" )
392+
393+ // Create EventRecorder for RolloutPlugin notifications
394+ rolloutPluginApiFactory := notificationapi .NewFactory (
395+ record .NewAPIFactorySettingsForRolloutPlugin (tolerantinformer .NewTolerantAnalysisRunInformer (dynamicInformerFactory )),
396+ defaults .Namespace (),
397+ notificationSecretInformerFactory .Core ().V1 ().Secrets ().Informer (),
398+ notificationConfigMapInformerFactory .Core ().V1 ().ConfigMaps ().Informer (),
399+ )
400+ rolloutPluginRecorder := record .NewEventRecorder (
401+ kubeClient ,
402+ metrics .MetricRolloutEventsTotal ,
403+ metrics .MetricNotificationFailedTotal ,
404+ metrics .MetricNotificationSuccessTotal ,
405+ metrics .MetricNotificationSend ,
406+ rolloutPluginApiFactory ,
407+ )
408+
409+ // Create NotificationController for RolloutPlugin for custom triggers
410+ rolloutPluginNotificationsController := notificationcontroller .NewControllerWithNamespaceSupport (
411+ dynamicClient .Resource (v1alpha1 .RolloutPluginGVR ),
412+ tolerantinformer .NewTolerantRolloutPluginInformer (dynamicInformerFactory ).Informer (),
413+ rolloutPluginApiFactory ,
414+ notificationcontroller .WithToUnstructured (func (obj metav1.Object ) (* unstructured.Unstructured , error ) {
415+ data , err := json .Marshal (obj )
416+ if err != nil {
417+ return nil , err
418+ }
419+ res := & unstructured.Unstructured {}
420+ err = json .Unmarshal (data , res )
421+ if err != nil {
422+ return nil , err
423+ }
424+ return res , nil
425+ }),
426+ )
427+
428+ // Set up the RolloutPlugin controller
429+ if err = (& rolloutplugin.RolloutPluginReconciler {
430+ Client : mgr .GetClient (),
431+ Scheme : mgr .GetScheme (),
432+ KubeClientset : kubeClient ,
433+ ArgoProjClientset : argoprojClient ,
434+ DynamicClientset : dynamicClient ,
435+ PluginManager : pluginManager ,
436+ InstanceID : instanceID ,
437+ MetricsServer : metricsServer ,
438+ Recorder : rolloutPluginRecorder ,
439+ }).SetupWithManager (mgr ); err != nil {
440+ log .Fatalf ("Failed to setup RolloutPlugin controller: %s" , err .Error ())
441+ }
442+
443+ // Run both the standard controllers and the RolloutPlugin controller concurrently
444+ var wg sync.WaitGroup
445+
446+ wg .Add (1 )
447+ go func () {
448+ defer wg .Done ()
449+ log .Info ("Starting standard controllers" )
450+ if err := cm .Run (ctx , rolloutThreads , serviceThreads , ingressThreads , experimentThreads , analysisThreads , electOpts ); err != nil {
451+ log .WithError (err ).Error ("Error running standard controllers" )
452+ os .Exit (1 )
453+ }
454+ log .Info ("Standard controllers stopped" )
455+ }()
456+
457+ // Start NotificationController for RolloutPlugin.
458+ wg .Add (1 )
459+ go func () {
460+ defer wg .Done ()
461+ log .Info ("Starting RolloutPlugin notifications controller" )
462+ rolloutPluginNotificationsController .Run (rolloutThreads , ctx .Done ())
463+ log .Info ("RolloutPlugin notifications controller stopped" )
464+ }()
465+
466+ // Start controller-runtime manager for RolloutPlugin
467+ wg .Add (1 )
468+ go func () {
469+ defer wg .Done ()
470+ log .Info ("Starting RolloutPlugin controller (controller-runtime)" )
471+ if err := mgr .Start (ctx ); err != nil {
472+ log .WithError (err ).Error ("Error running RolloutPlugin controller" )
473+ os .Exit (1 )
474+ }
475+ log .Info ("RolloutPlugin controller stopped" )
476+ }()
477+
478+ log .Info ("All controllers started successfully" )
479+
480+ // Wait for context cancellation
481+ <- ctx .Done ()
482+ log .Info ("Received shutdown signal, waiting for controllers to stop" )
483+
484+ // Wait for all controllers to finish
485+ wg .Wait ()
486+ log .Info ("All controllers stopped gracefully" )
487+ } else {
488+ // RolloutPlugin controller is disabled, run only standard controllers
489+ log .Info ("RolloutPlugin controller disabled, running only standard controllers" )
490+ log .Info ("Starting standard controllers" )
491+ if err := cm .Run (ctx , rolloutThreads , serviceThreads , ingressThreads , experimentThreads , analysisThreads , electOpts ); err != nil {
492+ log .WithError (err ).Error ("Error running standard controllers" )
493+ return err
494+ }
495+ log .Info ("Standard controllers stopped gracefully" )
293496 }
497+
294498 return nil
295499 },
296500 }
@@ -338,7 +542,7 @@ func newCommand() *cobra.Command {
338542 command .Flags ().DurationVar (& electOpts .LeaderElectionRenewDeadline , "leader-election-renew-deadline" , controller .DefaultLeaderElectionRenewDeadline , "The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. This is only applicable if leader election is enabled." )
339543 command .Flags ().DurationVar (& electOpts .LeaderElectionRetryPeriod , "leader-election-retry-period" , controller .DefaultLeaderElectionRetryPeriod , "The duration the clients should wait between attempting acquisition and renewal of a leadership. This is only applicable if leader election is enabled." )
340544 command .Flags ().BoolVar (& selfServiceNotificationEnabled , "self-service-notification-enabled" , false , "Allows rollouts controller to pull notification config from the namespace that the rollout resource is in. This is useful for self-service notification." )
341- command .Flags ().StringSliceVar (& controllersEnabled , "controllers" , nil , "Explicitly specify the list of controllers to run, currently only supports 'analysis', eg. --controller =analysis. Default: all controllers are enabled " )
545+ command .Flags ().StringSliceVar (& controllersEnabled , "controllers" , nil , "Explicitly specify the list of controllers to run. Supported values: 'analysis', 'rolloutplugin'. Examples: --controllers =analysis (analysis only), --controllers=rolloutplugin (rolloutplugin only), --controllers=analysis,rolloutplugin (analysis + rolloutplugin) . Default: analysis + rollout/experiment/service/ingress controllers (no rolloutplugin) " )
342546 command .Flags ().StringVar (& pprofAddress , "enable-pprof-address" , "" , "Enable pprof profiling on controller by providing a server address." )
343547 return & command
344548}
0 commit comments