Skip to content

Commit

Permalink
fix: update pipeline cms according to pipeline cron which enable is t…
Browse files Browse the repository at this point in the history
…rue (#1741)

* fix: update pipeline cms according to pipeline cron which enable is true

* fix: polish the code

* fix: add etcd lock to ensure that compentsate is executed only once

* fix: filter label first
  • Loading branch information
littlejiancc authored Sep 10, 2021
1 parent 240b172 commit d85b150
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 39 deletions.
26 changes: 16 additions & 10 deletions apistructs/pipeline_cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PipelineCronPagingRequest struct {
AllSources bool `schema:"allSources"`
Sources []PipelineSource `schema:"source"` // ?source=cdp-dev&source=cdp-test
YmlNames []string `schema:"ymlName"` // ?ymlName=11&ymlName=22
Enable *bool `schema:"enable"`

PageSize int `schema:"pageSize"`
PageNo int `schema:"pageNo"`
Expand All @@ -42,13 +43,17 @@ type PipelineCronDTO struct {
TimeCreated time.Time `json:"timeCreated"` // 记录创建时间
TimeUpdated time.Time `json:"timeUpdated"` // 记录更新时间

ApplicationID uint64 `json:"applicationID"`
Branch string `json:"branch"`
CronExpr string `json:"cronExpr"`
CronStartTime *time.Time `json:"cronStartTime"`
PipelineYmlName string `json:"pipelineYmlName"` // 一个分支下可以有多个 pipeline 文件,每个分支可以有单独的 cron 逻辑
BasePipelineID uint64 `json:"basePipelineID"` // 用于记录最开始创建出这条 cron 记录的 pipeline id
Enable *bool `json:"enable"` // 1 true, 0 false
ApplicationID uint64 `json:"applicationID"`
Branch string `json:"branch"`
CronExpr string `json:"cronExpr"`
CronStartTime *time.Time `json:"cronStartTime"`
PipelineYmlName string `json:"pipelineYmlName"` // 一个分支下可以有多个 pipeline 文件,每个分支可以有单独的 cron 逻辑
BasePipelineID uint64 `json:"basePipelineID"` // 用于记录最开始创建出这条 cron 记录的 pipeline id
Enable *bool `json:"enable"` // 1 true, 0 false
PipelineYml string `json:"pipelineYml"`
ConfigManageNamespaces []string `json:"configManageNamespaces"`
UserID string `json:"userID"`
OrgID uint64 `json:"orgID"`
}

type PipelineCronCreateRequest struct {
Expand All @@ -65,9 +70,10 @@ type PipelineCronDeleteResponse struct {
}

type PipelineCronUpdateRequest struct {
ID uint64 `json:"id"`
PipelineYml string `json:"pipelineYml"`
CronExpr string `json:"cronExpr"`
ID uint64 `json:"id"`
PipelineYml string `json:"pipelineYml"`
CronExpr string `json:"cronExpr"`
ConfigManageNamespaces []string `json:"configManageNamespaces"`
}

type PipelineCronUpdateResponse struct {
Expand Down
9 changes: 6 additions & 3 deletions bundle/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,17 @@ func (b *Bundle) PageListPipelineCrons(req apistructs.PipelineCronPagingRequest)
}

var pageResp apistructs.PipelineCronPagingResponse
httpResp, err := hc.Get(host).Path(fmt.Sprintf("/api/pipeline-crons")).
request := hc.Get(host).Path(fmt.Sprintf("/api/pipeline-crons")).
Header(httputil.InternalHeader, "bundle").
Param("allSources", strconv.FormatBool(req.AllSources)).
Params(map[string][]string{"source": sources}).
Params(map[string][]string{"ymlName": req.YmlNames}).
Param("pageSize", strconv.Itoa(req.PageSize)).
Param("pageNo", strconv.Itoa(req.PageNo)).
Do().JSON(&pageResp)
Param("pageNo", strconv.Itoa(req.PageNo))
if req.Enable != nil {
request = request.Param("enable", strconv.FormatBool(*req.Enable))
}
httpResp, err := request.Do().JSON(&pageResp)
if err != nil {
return nil, apierrors.ErrInvoke.InternalError(err)
}
Expand Down
8 changes: 8 additions & 0 deletions conf/dop/dop.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ mysql:
username: "${MYSQL_USERNAME}"
password: "${MYSQL_PASSWORD}"
database: "${MYSQL_DATABASE}"
etcd:
endpoints: "${ETCD_ENDPOINTS:https://localhost:2379}"
tls:
cert_file: "${ETCD_CERT_FILE:/certs/etcd-client.pem}"
cert_key_file: "${ETCD_CERT_KEY_FILE:/certs/etcd-client-key.pem}"
ca_file: "${ETCD_CA_FILE:/certs/etcd-ca.pem}"

# pipeline cms
[email protected]:
Expand Down Expand Up @@ -43,3 +49,5 @@ component-protocol.components.issue-manage.issueOperations:
component-protocol.components.issue-manage.issueTable:
component-protocol.components.issue-manage.issueViewGroup:
component-protocol.components.issue-manage.topHead:


4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ require (
go.uber.org/atomic v1.8.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/ratelimit v0.2.0
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f
golang.org/x/net v0.0.0-20210908191846-a5e095526f91
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 // indirect
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365 // indirect
golang.org/x/text v0.3.7
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83
google.golang.org/grpc v1.40.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,8 @@ golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210908191846-a5e095526f91 h1:E8wdt+zBjoxD3MA65wEc3pl25BsTi7tbkpwc4ANThjc=
golang.org/x/net v0.0.0-20210908191846-a5e095526f91/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180603041954-1e0a3fa8ba9a/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -2115,6 +2117,8 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34 h1:GkvMjFtXUmahfDtashnc1mnrCtuBVcwse5QV2lUk/tI=
golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365 h1:6wSTsvPddg9gc/mVEEyk9oOAoxn+bT4Z9q1zx+4RwA4=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
Expand Down
8 changes: 4 additions & 4 deletions modules/dop/endpoints/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (e *Endpoints) pipelineCreate(ctx context.Context, r *http.Request, vars ma
}

// update CmsNsConfigs
if err = e.updateCmsNsConfigs(identityInfo.UserID, app.OrgID); err != nil {
if err = e.UpdateCmsNsConfigs(identityInfo.UserID, app.OrgID); err != nil {
return errorresp.ErrResp(err)
}

Expand Down Expand Up @@ -379,7 +379,7 @@ func (e *Endpoints) pipelineRun(ctx context.Context, r *http.Request, vars map[s
}

// update CmsNsConfigs
if err = e.updateCmsNsConfigs(identityInfo.UserID, p.OrgID); err != nil {
if err = e.UpdateCmsNsConfigs(identityInfo.UserID, p.OrgID); err != nil {
return errorresp.ErrResp(err)
}

Expand Down Expand Up @@ -414,8 +414,8 @@ func (e *Endpoints) pipelineRun(ctx context.Context, r *http.Request, vars map[s
return httpserver.OkResp(nil)
}

// updateCmsNsConfigs update CmsNsConfigs
func (e *Endpoints) updateCmsNsConfigs(userID string, orgID uint64) error {
// UpdateCmsNsConfigs update CmsNsConfigs
func (e *Endpoints) UpdateCmsNsConfigs(userID string, orgID uint64) error {
members, err := e.bdl.GetMemberByUserAndScope(apistructs.OrgScope, userID, orgID)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion modules/dop/endpoints/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,5 @@ func TestUpdateCmsNsConfigsWhenUserNotExist(t *testing.T) {
})
defer monkey.UnpatchAll()
e := New()
assert.Equal(t, "the member is not exist", e.updateCmsNsConfigs("1", 1).Error())
assert.Equal(t, "the member is not exist", e.UpdateCmsNsConfigs("1", 1).Error())
}
2 changes: 1 addition & 1 deletion modules/dop/endpoints/pipelinecron.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (e *Endpoints) pipelineCronStart(ctx context.Context, r *http.Request, vars
if err != nil {
return errorresp.ErrResp(err)
}
if err = e.updateCmsNsConfigs(identityInfo.UserID, appDto.OrgID); err != nil {
if err = e.UpdateCmsNsConfigs(identityInfo.UserID, appDto.OrgID); err != nil {
return errorresp.ErrResp(err)
}

Expand Down
92 changes: 92 additions & 0 deletions modules/dop/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dop

import (
"context"
"fmt"
"net/url"
"time"

Expand Down Expand Up @@ -84,6 +86,8 @@ import (
"github.com/erda-project/erda/pkg/ucauth"
)

const EtcdPipelineCmsCompensate = "dop/pipelineCms/compensate"

// Initialize 初始化应用启动服务.
func (p *provider) Initialize(ctx servicehub.Context) error {
conf.Load()
Expand Down Expand Up @@ -195,6 +199,27 @@ func (p *provider) Initialize(ctx servicehub.Context) error {
}
}()

// compensate pipeline cms according to pipeline cron which enable is true
go func() {
// add etcd lock to ensure that it is executed only once
resp, err := p.EtcdClient.Get(context.Background(), EtcdPipelineCmsCompensate)
if err != nil {
logrus.Error(err)
return
}
if len(resp.Kvs) == 0 {
logrus.Infof("start compensate pipelineCms")
if err = compensatePipelineCms(ep); err != nil {
logrus.Error(err)
}
_, err = p.EtcdClient.Put(context.Background(), EtcdPipelineCmsCompensate, "true")
if err != nil {
logrus.Error(err)
}
}

}()

// daily issue expiry status update cron job
go func() {
cron := cron.New()
Expand Down Expand Up @@ -621,3 +646,70 @@ func copyTestFileTask(ep *endpoints.Endpoints) {
}
ep.TestSetService().CopyTestSet(record)
}

// compensatePipelineCms compensate pipeline cms according to pipeline cron which enable is true
// it will be deprecated in the later version
func compensatePipelineCms(ep *endpoints.Endpoints) error {
enable := true
// get total
cron, err := bdl.Bdl.PageListPipelineCrons(apistructs.PipelineCronPagingRequest{
AllSources: false,
Sources: []apistructs.PipelineSource{apistructs.PipelineSourceDice},
YmlNames: nil,
PageSize: 1,
PageNo: 1,
Enable: &enable,
})
if err != nil {
logrus.Errorf("failed to PageListPipelineCrons, err: %s", err.Error())
return err
}
total := cron.Total
pageSize := 1000
crons := make([]*apistructs.PipelineCronDTO, 0, total)
for i := 0; i < int(total)/pageSize+1; i++ {
cron, err = bdl.Bdl.PageListPipelineCrons(apistructs.PipelineCronPagingRequest{
AllSources: true,
Sources: nil,
YmlNames: nil,
PageSize: pageSize,
PageNo: i + 1,
Enable: &enable,
})
if err != nil {
logrus.Errorf("failed to PageListPipelineCrons, err: %s", err.Error())
return err
}
crons = append(crons, cron.Data...)
}

// userOrgMap judge the user ns is compensated or not in the org
// key: userID-orgID, value: struct{}
userOrgMap := make(map[string]struct{})
for _, v := range crons {
if v.Enable != nil && *v.Enable &&
v.UserID != "" && v.OrgID != 0 {
ns := utils.MakeUserOrgPipelineCmsNs(v.UserID, v.OrgID)
if !strutil.InSlice(ns, v.ConfigManageNamespaces) {
err := bdl.Bdl.UpdatePipelineCron(apistructs.PipelineCronUpdateRequest{
ID: v.ID,
PipelineYml: v.PipelineYml,
CronExpr: v.CronExpr,
ConfigManageNamespaces: []string{utils.MakeUserOrgPipelineCmsNs(v.UserID, v.OrgID)},
})
if err != nil {
logrus.Errorf("failed to UpdatePipelineCron, err: %s", err.Error())
}
}
if _, ok := userOrgMap[fmt.Sprintf("%s-%d", v.UserID, v.OrgID)]; !ok {
userOrgMap[fmt.Sprintf("%s-%d", v.UserID, v.OrgID)] = struct{}{}
// the member may not exist
err = ep.UpdateCmsNsConfigs(v.UserID, v.OrgID)
if err != nil {
logrus.Errorf("failed to UpdateCmsNsConfigs, err: %s", err.Error())
}
}
}
}
return nil
}
63 changes: 63 additions & 0 deletions modules/dop/initialize_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2021 Terminus, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dop

import (
"reflect"
"testing"

"bou.ke/monkey"
"github.com/alecthomas/assert"

"github.com/erda-project/erda/apistructs"
"github.com/erda-project/erda/bundle"
"github.com/erda-project/erda/modules/dop/endpoints"
)

func TestCompensatePipelineCms(t *testing.T) {
ep := endpoints.New()
var bdl *bundle.Bundle
monkey.PatchInstanceMethod(reflect.TypeOf(bdl), "PageListPipelineCrons",
func(*bundle.Bundle, apistructs.PipelineCronPagingRequest) (*apistructs.PipelineCronPagingResponseData, error) {
return &apistructs.PipelineCronPagingResponseData{
Total: 3,
Data: []*apistructs.PipelineCronDTO{
{
UserID: "1",
OrgID: 1,
},
{
UserID: "1",
OrgID: 0,
},
{
UserID: "",
OrgID: 1,
},
},
}, nil
})
defer monkey.UnpatchAll()
monkey.PatchInstanceMethod(reflect.TypeOf(bdl), "UpdatePipelineCron",
func(*bundle.Bundle, apistructs.PipelineCronUpdateRequest) error {
return nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(ep), "UpdateCmsNsConfigs",
func(*endpoints.Endpoints, string, uint64) error {
return nil
})
err := compensatePipelineCms(ep)
assert.NoError(t, err)
}
15 changes: 10 additions & 5 deletions modules/dop/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"os"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/jinzhu/gorm"
"github.com/sirupsen/logrus"

Expand All @@ -27,6 +28,7 @@ import (
"github.com/erda-project/erda-infra/base/version"
componentprotocol "github.com/erda-project/erda-infra/providers/component-protocol"
"github.com/erda-project/erda-infra/providers/component-protocol/protocol"
"github.com/erda-project/erda-infra/providers/etcd"
"github.com/erda-project/erda-infra/providers/i18n"
cmspb "github.com/erda-project/erda-proto-go/core/pipeline/cms/pb"
"github.com/erda-project/erda/bundle"
Expand All @@ -50,9 +52,11 @@ type provider struct {
PipelineDs definition_client.Processor `autowired:"erda.core.pipeline.definition-process-client"`
TestPlanSvc *testplan.TestPlanService `autowired:"erda.core.dop.autotest.testplan.TestPlanService"`

Protocol componentprotocol.Interface
Tran i18n.Translator `translator:"component-protocol"`
DB *gorm.DB `autowired:"mysql-client"`
Protocol componentprotocol.Interface
Tran i18n.Translator `translator:"component-protocol"`
DB *gorm.DB `autowired:"mysql-client"`
ETCD etcd.Interface // autowired
EtcdClient *clientv3.Client // autowired
}

func (p *provider) Init(ctx servicehub.Context) error {
Expand Down Expand Up @@ -103,7 +107,8 @@ func (p *provider) Init(ctx servicehub.Context) error {

func init() {
servicehub.Register("dop", &servicehub.Spec{
Services: []string{"dop"},
Creator: func() servicehub.Provider { return &provider{} },
Services: []string{"dop"},
Dependencies: []string{"etcd"},
Creator: func() servicehub.Provider { return &provider{} },
})
}
Loading

0 comments on commit d85b150

Please sign in to comment.