-
Notifications
You must be signed in to change notification settings - Fork 5
/
attempt.go
176 lines (146 loc) · 4.4 KB
/
attempt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package digdag
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
uuid "github.com/satori/go.uuid"
)
type attemptsWrapper struct {
Attempts []*Attempt `json:"attempts"`
}
// Attempt is the struct for digdag attempt
type Attempt struct {
ID string `json:"id"`
Index int `json:"index"`
Project struct {
ID string `json:"id"`
Name string `json:"name"`
} `json:"project"`
Workflow struct {
Name string `json:"name"`
ID string `json:"id"`
} `json:"workflow"`
SessionID string `json:"sessionId"`
SessionUUID string `json:"sessionUuid"`
SessionTime string `json:"sessionTime"` // FIXME fix type from string to time.Time
RetryAttemptName interface{} `json:"retryAttemptName,omitempty"`
Done bool `json:"done"`
Success bool `json:"success"`
CancelRequested bool `json:"cancelRequested"`
Params map[string]string `json:"params"`
CreatedAt string `json:"createdAt"`
FinishedAt string `json:"finishedAt"`
}
// CreateAttempt is struct for create a new attempt
type CreateAttempt struct {
WorkflowID string `json:"workflowId"`
SessionTime string `json:"sessionTime"`
RetryAttemptName string `json:"retryAttemptName,omitempty"`
Params map[string]string `json:"params"`
}
// NewCreateAttempt to create a new CreateAttempt struct
func NewCreateAttempt(workflowID, sessionTime, retryAttemptName string) *CreateAttempt {
return &CreateAttempt{
WorkflowID: workflowID,
SessionTime: sessionTime,
RetryAttemptName: retryAttemptName,
Params: map[string]string{},
}
}
// GetAttempts get attempts response
func (c *Client) GetAttempts(attempt *Attempt, includeRetried bool) ([]*Attempt, error) {
spath := "/api/attempts"
if attempt == nil {
attempt = new(Attempt)
}
project := attempt.Project.Name
workflow := attempt.Workflow.Name
var aw *attemptsWrapper
ro := &RequestOpts{
Params: map[string]string{
"project": project,
"workflow": workflow,
"include_retried": strconv.FormatBool(includeRetried),
},
}
resp, err := c.NewRequest(http.MethodGet, spath, ro)
if err != nil {
return nil, err
}
if err := decodeBody(resp, &aw); err != nil {
return nil, err
}
// If any attempts not found
if len(aw.Attempts) == 0 {
return nil, fmt.Errorf("attempts does not exist. project=%s workflow=%s", project, workflow)
}
return aw.Attempts, nil
}
// GetAttemptIDs to get attemptID from sessionTime
func (c *Client) GetAttemptIDs(projectName, workflowName, targetSession string) (attemptIDs []string, err error) {
params := new(Attempt)
params.Project.Name = projectName
params.Workflow.Name = workflowName
attempts, err := c.GetAttempts(params, true)
if err != nil {
return nil, err
}
for k := range attempts {
sessionTime := attempts[k].SessionTime
if sessionTime == targetSession {
attemptIDs = append(attemptIDs, attempts[k].ID)
}
}
// If any attemptID not found
if len(attemptIDs) == 0 {
return []string{}, fmt.Errorf("attempts does not exist. project=%s workflow=%s sessionTime=%s", projectName, workflowName, targetSession)
}
return attemptIDs, nil
}
// CreateNewAttempt to create a new attempt
func (c *Client) CreateNewAttempt(workflowID, sessionTime string, params []string, retry bool) (attempt *Attempt, done bool, err error) {
spath := "/api/attempts"
ca := NewCreateAttempt(workflowID, sessionTime, "")
// Set params
for _, v := range params {
if strings.Contains(v, "=") {
result := strings.Split(v, "=")
key := result[0]
val := result[1]
ca.Params[key] = val
}
}
// Retry workflow
if retry == true {
// TODO: add retry attempt name (optional)
generatedUUID := uuid.NewV4()
textID, err := generatedUUID.MarshalText()
if err != nil {
return nil, false, err
}
ca.RetryAttemptName = string(textID)
}
// Create new attempt
body, err := json.Marshal(ca)
if err != nil {
return nil, false, err
}
ro := &RequestOpts{
Body: bytes.NewBuffer(body),
}
resp, err := c.NewRequest(http.MethodPut, spath, ro)
if err != nil {
// if already session exist
if resp.StatusCode == http.StatusConflict {
return nil, true, nil
}
return nil, false, err
}
if err := decodeBody(resp, &attempt); err != nil {
return nil, false, err
}
return attempt, done, err
}