Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,7 @@
UpstreamServiceLabelSelector: map[string]string{
"app.kubernetes.io/component": "bindings-forwarder",
},
RefreshDuration: time.Minute * 10,

Check warning on line 760 in cmd/api/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/api/main.go#L760

Added line #L760 was not covered by tests
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "BoundEndpoint")
os.Exit(1)
Expand All @@ -765,12 +766,10 @@
// Create a new Runnable that implements Start that the manager can manage running
if err := mgr.Add(&bindingscontroller.BoundEndpointPoller{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Log: ctrl.Log.WithName("controllers").WithName("BoundEndpointPoller"),
Recorder: mgr.GetEventRecorderFor("endpoint-binding-poller"),
Namespace: opts.namespace,
KubernetesOperatorConfigName: opts.releaseName,
EndpointSelectors: opts.bindings.endpointSelectors,
TargetServiceAnnotations: targetServiceAnnotations,
TargetServiceLabels: targetServiceLabels,
PollingInterval: 10 * time.Second,
Expand Down
154 changes: 78 additions & 76 deletions internal/controller/bindings/boundendpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@

// UpstreamServiceLabelSelectors are the set of labels for the Pod Forwarders
UpstreamServiceLabelSelector map[string]string

RefreshDuration time.Duration
}

// +kubebuilder:rbac:groups=bindings.k8s.ngrok.com,resources=boundendpoints,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -166,11 +168,10 @@
return res, err
}

// TODO: if controller.IsUpsert(cr) ->
// send an update to the ngrok API to update the endpoint binding and status fields

// success
return ctrl.Result{}, nil
return ctrl.Result{
RequeueAfter: r.RefreshDuration,
}, nil

Check warning on line 174 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L172-L174

Added lines #L172 - L174 were not covered by tests
}

func (r *BoundEndpointReconciler) create(ctx context.Context, cr *bindingsv1alpha1.BoundEndpoint) error {
Expand All @@ -184,22 +185,12 @@
return r.controller.ReconcileStatus(ctx, cr, err)
}

if err := r.tryToBindEndpoint(ctx, cr); err != nil {
return r.controller.ReconcileStatus(ctx, cr, err)
}

return r.controller.ReconcileStatus(ctx, cr, nil)
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

Check warning on line 189 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L188-L189

Added lines #L188 - L189 were not covered by tests

// setEndpointsStatus sets the status of every endpoint on boundEndpoint to the desired status
// Note: All endpoints share the same status since they are represented by the same resources (Target/Upstream Services)
func setEndpointsStatus(boundEndpoint *bindingsv1alpha1.BoundEndpoint, desired *bindingsv1alpha1.BindingEndpoint) {
for i := range boundEndpoint.Status.Endpoints {
endpoint := &boundEndpoint.Status.Endpoints[i]
endpoint.Status = desired.Status
endpoint.ErrorCode = desired.ErrorCode
endpoint.ErrorMessage = desired.ErrorMessage
}
err := r.testBoundEndpointConnectivity(timeoutCtx, cr)
determineAndSetBindingEndpointStatus(cr, err)
return r.controller.ReconcileStatus(ctx, cr, err)

Check warning on line 193 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L191-L193

Added lines #L191 - L193 were not covered by tests
}

func (r *BoundEndpointReconciler) createTargetService(ctx context.Context, owner *bindingsv1alpha1.BoundEndpoint, service *v1.Service) error {
Expand Down Expand Up @@ -319,12 +310,13 @@
r.Recorder.Event(&existingTargetService, v1.EventTypeNormal, "Updated", "Updated Target Service")
}

if err := r.tryToBindEndpoint(ctx, cr); err != nil {
return r.controller.ReconcileStatus(ctx, cr, err)
}
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

Check warning on line 314 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L313-L314

Added lines #L313 - L314 were not covered by tests

err = r.testBoundEndpointConnectivity(timeoutCtx, cr)
determineAndSetBindingEndpointStatus(cr, err)

Check warning on line 317 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L316-L317

Added lines #L316 - L317 were not covered by tests
r.Recorder.Event(cr, v1.EventTypeNormal, "Updated", "Updated Services")
return r.controller.ReconcileStatus(ctx, cr, nil)
return r.controller.ReconcileStatus(ctx, cr, err)

Check warning on line 319 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L319

Added line #L319 was not covered by tests
}

func (r *BoundEndpointReconciler) delete(ctx context.Context, cr *bindingsv1alpha1.BoundEndpoint) error {
Expand Down Expand Up @@ -530,86 +522,96 @@
}

// tryToBindEndpoint attempts a TCP connection through the provisioned services for the BoundEndpoint
func (r *BoundEndpointReconciler) tryToBindEndpoint(ctx context.Context, boundEndpoint *bindingsv1alpha1.BoundEndpoint) error {
func (r *BoundEndpointReconciler) testBoundEndpointConnectivity(ctx context.Context, boundEndpoint *bindingsv1alpha1.BoundEndpoint) error {

Check warning on line 525 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L525

Added line #L525 was not covered by tests
log := ctrl.LoggerFrom(ctx).WithValues("uri", boundEndpoint.Spec.EndpointURI)

retries := 5
attempt := 0
waitDuration := 0 * time.Second // start immediately
backoffDuration := 3 * time.Second // increasing duration to wait between retries
dialTimeout := 1 * time.Second // timeout for dialing the targetService

// to be filled in
var bindErr error
for attempt < retries {
attempt++
time.Sleep(waitDuration) // wait for attempt to be ready

// 1. Parse URI: rely on kube-dns to resolve the targetService's ExternalName
uri, err := url.Parse(boundEndpoint.Spec.EndpointURI)
if err != nil {
bindErr = fmt.Errorf("failed to parse BoundEndpoint URI %s: %w", boundEndpoint.Spec.EndpointURI, err)
waitDuration += backoffDuration
continue
}
bindErrMsg := fmt.Sprintf("connectivity check failed for BoundEndpoint %s", boundEndpoint.Name)

Check warning on line 528 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L528

Added line #L528 was not covered by tests

// 2. Dial target
conn, err := net.DialTimeout("tcp", uri.Host, dialTimeout)
if err != nil {
log.Error(err, "Failed to bind BoundEndpoint", "attempt", attempt, "retries", retries)
bindErr = err
waitDuration += backoffDuration
continue
}
// start at 0, then 1, then backoff *= 2
backoff := time.Second * 0

// we should cancel long before we hit 8 retries, but just in case
// we forget to set a deadline or cancel the context let's make sure we don't run forever
retries := 8

// rely on kube-dns to resolve the targetService's ExternalName
uri, err := url.Parse(boundEndpoint.Spec.EndpointURI)
if err != nil {
wrappedErr := fmt.Errorf("failed to parse BoundEndpoint URI %s: %w", boundEndpoint.Spec.EndpointURI, err)
log.Error(wrappedErr, bindErrMsg, "uri", boundEndpoint.Spec.EndpointURI)
return wrappedErr
}

Check warning on line 543 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L530-L543

Added lines #L530 - L543 were not covered by tests

// 3. Connection exists, close it
if err := conn.Close(); err != nil {
log.Error(err, "Failed to close connection", "attempt", attempt, "retries", retries)
bindErr = err
waitDuration += backoffDuration
continue
for i := range retries {
select {
case <-ctx.Done():
err = fmt.Errorf("attempting to connect to BoundEndpoint URI timed out")
log.Error(err, bindErrMsg)
return err

Check warning on line 550 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L545-L550

Added lines #L545 - L550 were not covered by tests

case <-time.After(backoff):
if backoff == 0 {
backoff = time.Second * 1
} else {
backoff *= 2
}

Check warning on line 557 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L552-L557

Added lines #L552 - L557 were not covered by tests

conn, err := net.DialTimeout("tcp", uri.Host, time.Second*2)
if err != nil {
log.Error(err, "failed to dial endpoint uri", "attempt", i+1)
continue

Check warning on line 562 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L559-L562

Added lines #L559 - L562 were not covered by tests
}
// conn exists, close it
if err := conn.Close(); err != nil {
log.Error(err, "failed to close connection to endpoint uri", "attempt", i+1)
continue

Check warning on line 567 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L565-L567

Added lines #L565 - L567 were not covered by tests
}

// connection was good
return nil

Check warning on line 571 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L571

Added line #L571 was not covered by tests
}

// Success: dialled and closed cleanly
bindErr = nil
break
}

// update statuses
err = fmt.Errorf("exceeded max retries")
log.Error(err, bindErrMsg)
return err

Check warning on line 578 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L576-L578

Added lines #L576 - L578 were not covered by tests

}

// determineAndSetBindingEndpointStatus determines what the status of an endpoint should be
// based on the passed-in error and then calls setEndpointsStatus
func determineAndSetBindingEndpointStatus(boundEndpoint *bindingsv1alpha1.BoundEndpoint, err error) {

Check warning on line 584 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L584

Added line #L584 was not covered by tests
var desired *bindingsv1alpha1.BindingEndpoint
var ngrokErr *ngrok.Error
if bindErr != nil {
if err != nil {

Check warning on line 587 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L587

Added line #L587 was not covered by tests
// error
log.Error(bindErr, "Failed to bind BoundEndpoint, moving to error")
ngrokErr = ngrokapi.NewNgrokError(bindErr, ngrokapi.NgrokOpErrFailedToConnectServices, "Failed to bind BoundEndpoint")
ngrokErr = ngrokapi.NewNgrokError(err, ngrokapi.NgrokOpErrFailedToConnectServices, "failed to bind BoundEndpoint")

Check warning on line 589 in internal/controller/bindings/boundendpoint_controller.go

View check run for this annotation

Codecov / codecov/patch

internal/controller/bindings/boundendpoint_controller.go#L589

Added line #L589 was not covered by tests
desired = &bindingsv1alpha1.BindingEndpoint{
Status: bindingsv1alpha1.StatusError,
ErrorCode: ngrokErr.ErrorCode,
ErrorMessage: ngrokErr.Error(),
}
} else {
// success
log.Info("Bound BoundEndpoint successfully, moving to bound")
desired = &bindingsv1alpha1.BindingEndpoint{
Status: bindingsv1alpha1.StatusBound,
ErrorCode: "",
ErrorMessage: "",
}

}

// set status
setEndpointsStatus(boundEndpoint, desired)
}

// Why do we check for nil here and return nil explicitly instead of just `return ngrokErr`?
// Because *ngrok.Error meets the interface of error means that error is actually (T=*ngrok.Error, V=nil)
// so outer function signature is (error) and not (*ngrok.Error)
// and an interface _pointing_ to nil is != to nil itself
// therefore *ngrok.Error.(error) is _always_ != nil
//
// See: https://go.dev/doc/faq#nil_error
if ngrokErr != nil {
return ngrokErr
// setEndpointsStatus sets the status of every endpoint on boundEndpoint to the desired status
// Note: All endpoints share the same status since they are represented by the same resources (Target/Upstream Services)
func setEndpointsStatus(boundEndpoint *bindingsv1alpha1.BoundEndpoint, desired *bindingsv1alpha1.BindingEndpoint) {
for i := range boundEndpoint.Status.Endpoints {
endpoint := &boundEndpoint.Status.Endpoints[i]
endpoint.Status = desired.Status
endpoint.ErrorCode = desired.ErrorCode
endpoint.ErrorMessage = desired.ErrorMessage
}
return nil
}
5 changes: 0 additions & 5 deletions internal/controller/bindings/boundendpoint_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ngrok/ngrok-operator/internal/ngrokapi"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -33,7 +32,6 @@ type PortRangeConfig struct {
// BoundEndpointPoller is a process to poll the ngrok API for binding_endpoints and reconcile the desired state with the cluster state of BoundEndpoints
type BoundEndpointPoller struct {
client.Client
Scheme *runtime.Scheme
Log logr.Logger
Recorder record.EventRecorder

Expand All @@ -46,9 +44,6 @@ type BoundEndpointPoller struct {
// NgrokClientset is the ngrok API clientset
NgrokClientset ngrokapi.Clientset

// EndpointSelectors is a list of cel expressions for filtering endpoints that will be projected into the cluster
EndpointSelectors []string

// PollingInterval is how often to poll the ngrok API for reconciling the BindingEndpoints
PollingInterval time.Duration

Expand Down
Loading