-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.go
More file actions
316 lines (277 loc) · 10.7 KB
/
client.go
File metadata and controls
316 lines (277 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
// Copyright 2025 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sdk
import (
"context"
"fmt"
"iter"
"slices"
"time"
"github.com/pipe-cd/piped-plugin-sdk-go/toolregistry"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/plugin/pipedservice"
"github.com/pipe-cd/pipecd/pkg/rpc/rpcclient"
)
const (
// MetadataKeyStageDisplay is the key of the stage metadata to be displayed on the deployment detail UI.
MetadataKeyStageDisplay = model.MetadataKeyStageDisplay
// MetadataKeyStageApprovedUsers is the key of the metadata of who approved the stage.
// It will be displayed in the DEPLOYMENT_APPROVED notification.
// e.g. user-1,user-2
MetadataKeyStageApprovedUsers = model.MetadataKeyStageApprovedUsers
listStageCommandsInterval = 5 * time.Second
)
type pluginServiceClient struct {
pipedservice.PluginServiceClient
conn *grpc.ClientConn
}
func newPluginServiceClient(ctx context.Context, address string, opts ...rpcclient.DialOption) (*pluginServiceClient, error) {
// Clone the opts to avoid modifying the original opts slice.
opts = slices.Clone(opts)
// Append the required options.
// The WithBlock option is required to make the client wait until the connection is up.
// The WithInsecure option is required to disable the transport security.
// The piped service does not require transport security because it is only used in localhost.
opts = append(opts, rpcclient.WithBlock(), rpcclient.WithInsecure())
conn, err := rpcclient.DialContext(ctx, address, opts...)
if err != nil {
return nil, err
}
return &pluginServiceClient{
PluginServiceClient: pipedservice.NewPluginServiceClient(conn),
conn: conn,
}, nil
}
func (c *pluginServiceClient) Close() error {
return c.conn.Close()
}
// Client is a toolkit for interacting with the piped service.
// It provides methods to call the piped service APIs.
// It's a wrapper around the raw piped service client.
type Client struct {
base *pluginServiceClient
// pluginName is used to identify which plugin sends requests to piped.
pluginName string
// applicationID is used to identify the application that the client is working with.
applicationID string
// deploymentID is used to identify the deployment that the client is working with.
// This field exists only when the client is working with a specific deployment; for example, when this client is passed as the deployoment plugin's argument.
deploymentID string
// stageID is used to identify the stage that the client is working with.
// This field exists only when the client is working with a specific stage; for example, when this client is passed as the ExecuteStage method's argument.
stageID string
// stageLogPersister is used to persist the stage logs.
// This field exists only when the client is working with a specific stage; for example, when this client is passed as the ExecuteStage method's argument.
stageLogPersister StageLogPersister
// toolRegistry is used to install and get the path of the tools used in the plugin.
// TODO: We should consider installing the tools in other way.
toolRegistry *toolregistry.ToolRegistry
}
// NewClient creates a new client.
// DO NOT USE this function except in tests.
// FIXME: Remove this function and make a better way for tests.
func NewClient(base *pluginServiceClient, pluginName, applicationID, stageID string, slp StageLogPersister, tr *toolregistry.ToolRegistry) *Client {
return &Client{
base: base,
pluginName: pluginName,
applicationID: applicationID,
stageID: stageID,
stageLogPersister: slp,
toolRegistry: tr,
}
}
// StageLogPersister is a interface for persisting the stage logs.
// Use this to persist the stage logs and make it viewable on the UI.
type StageLogPersister interface {
Write(log []byte) (int, error)
Info(log string)
Infof(format string, a ...interface{})
Success(log string)
Successf(format string, a ...interface{})
Error(log string)
Errorf(format string, a ...interface{})
}
// GetStageMetadata gets the metadata of the current stage.
func (c *Client) GetStageMetadata(ctx context.Context, key string) (string, bool, error) {
resp, err := c.base.GetStageMetadata(ctx, &pipedservice.GetStageMetadataRequest{
DeploymentId: c.deploymentID,
StageId: c.stageID,
Key: key,
})
if err != nil {
return "", false, err
}
return resp.Value, resp.Found, nil
}
// PutStageMetadata stores the metadata of the current stage.
func (c *Client) PutStageMetadata(ctx context.Context, key, value string) error {
_, err := c.base.PutStageMetadata(ctx, &pipedservice.PutStageMetadataRequest{
DeploymentId: c.deploymentID,
StageId: c.stageID,
Key: key,
Value: value,
})
return err
}
// PutStageMetadataMulti stores the multiple metadata of the current stage.
func (c *Client) PutStageMetadataMulti(ctx context.Context, metadata map[string]string) error {
_, err := c.base.PutStageMetadataMulti(ctx, &pipedservice.PutStageMetadataMultiRequest{
DeploymentId: c.deploymentID,
StageId: c.stageID,
Metadata: metadata,
})
return err
}
// GetDeploymentPluginMetadata gets the metadata of the current deployment and plugin.
func (c *Client) GetDeploymentPluginMetadata(ctx context.Context, key string) (string, bool, error) {
resp, err := c.base.GetDeploymentPluginMetadata(ctx, &pipedservice.GetDeploymentPluginMetadataRequest{
DeploymentId: c.deploymentID,
PluginName: c.pluginName,
Key: key,
})
if err != nil {
return "", false, err
}
return resp.Value, resp.Found, err
}
// PutDeploymentPluginMetadata stores the metadata of the current deployment and plugin.
func (c *Client) PutDeploymentPluginMetadata(ctx context.Context, key, value string) error {
_, err := c.base.PutDeploymentPluginMetadata(ctx, &pipedservice.PutDeploymentPluginMetadataRequest{
DeploymentId: c.deploymentID,
PluginName: c.pluginName,
Key: key,
Value: value,
})
return err
}
// PutDeploymentPluginMetadataMulti stores the multiple metadata of the current deployment and plugin.
func (c *Client) PutDeploymentPluginMetadataMulti(ctx context.Context, metadata map[string]string) error {
_, err := c.base.PutDeploymentPluginMetadataMulti(ctx, &pipedservice.PutDeploymentPluginMetadataMultiRequest{
DeploymentId: c.deploymentID,
PluginName: c.pluginName,
Metadata: metadata,
})
return err
}
// GetDeploymentSharedMetadata gets the metadata of the current deployment
// which is shared among piped and plugins.
func (c *Client) GetDeploymentSharedMetadata(ctx context.Context, key string) (string, bool, error) {
resp, err := c.base.GetDeploymentSharedMetadata(ctx, &pipedservice.GetDeploymentSharedMetadataRequest{
DeploymentId: c.deploymentID,
Key: key,
})
if err != nil {
return "", false, err
}
return resp.Value, resp.Found, err
}
// GetApplicationSharedObject gets the application object which is shared across deployments.
func (c *Client) GetApplicationSharedObject(ctx context.Context, key string) (obj []byte, found bool, err error) {
resp, err := c.base.GetApplicationSharedObject(ctx, &pipedservice.GetApplicationSharedObjectRequest{
ApplicationId: c.applicationID,
PluginName: c.pluginName,
Key: key,
})
if status.Code(err) == codes.NotFound {
return nil, false, nil
}
if err != nil {
return nil, false, err
}
return resp.Object, true, nil
}
// PutApplicationSharedObject stores the application object which is shared across deployments.
func (c *Client) PutApplicationSharedObject(ctx context.Context, key string, object []byte) error {
_, err := c.base.PutApplicationSharedObject(ctx, &pipedservice.PutApplicationSharedObjectRequest{
ApplicationId: c.applicationID,
PluginName: c.pluginName,
Key: key,
Object: object,
})
return err
}
// StageLogPersister returns the stage log persister.
// Use this to persist the stage logs and make it viewable on the UI.
// This method should be called only when the client is working with a specific stage, for example, when this client is passed as the ExecuteStage method's argument.
func (c *Client) StageLogPersister() (StageLogPersister, error) {
if c.stageLogPersister == nil {
return nil, fmt.Errorf("stage log persister is not set")
}
return c.stageLogPersister, nil
}
// LogPersister returns the stage log persister.
// Use this to persist the stage logs and make it viewable on the UI.
// This method should be called only when the client is working with a specific stage, for example, when this client is passed as the ExecuteStage method's argument.
// Otherwise, it will return nil.
// Deprecated: use StageLogPersister instead.
func (c *Client) LogPersister() StageLogPersister {
return c.stageLogPersister
}
// ToolRegistry returns the tool registry.
// Use this to install and get the path of the tools used in the plugin.
func (c *Client) ToolRegistry() *toolregistry.ToolRegistry {
return c.toolRegistry
}
// ListStageCommands returns the list of stage commands of the given command types.
func (c Client) ListStageCommands(ctx context.Context, commandTypes ...CommandType) iter.Seq2[*StageCommand, error] {
return func(yield func(*StageCommand, error) bool) {
returned := map[string]struct{}{}
modelCommandTypes := make([]model.Command_Type, 0, len(commandTypes))
for _, cmdType := range commandTypes {
modelType, err := cmdType.toModelEnum()
if err != nil {
if !yield(nil, err) {
return
}
continue
}
modelCommandTypes = append(modelCommandTypes, modelType)
}
for {
resp, err := c.base.ListStageCommands(ctx, &pipedservice.ListStageCommandsRequest{
DeploymentId: c.deploymentID,
StageId: c.stageID,
})
if err != nil {
if !yield(nil, err) {
return
}
continue
}
for _, command := range resp.Commands {
if !slices.Contains(modelCommandTypes, command.Type) {
continue
}
if _, ok := returned[command.Id]; ok {
continue
}
returned[command.Id] = struct{}{}
stageCommand, err := newStageCommand(command)
if err != nil {
if !yield(nil, err) {
return
}
continue
}
if !yield(&stageCommand, nil) {
return
}
}
time.Sleep(listStageCommandsInterval)
}
}
}