forked from appscode/g2
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathinpack.go
131 lines (118 loc) · 2.96 KB
/
inpack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package worker
import (
"bytes"
"encoding/binary"
"fmt"
"strconv"
rt "github.com/quantcast/g2/pkg/runtime"
)
// Worker side job
type inPack struct {
dataType rt.PT
data []byte
handle, uniqueId, fn string
a *agent
}
// Create a new job
func getInPack() *inPack {
return &inPack{}
}
func (inpack *inPack) Data() []byte {
return inpack.data
}
func (inpack *inPack) Fn() string {
return inpack.fn
}
func (inpack *inPack) Handle() string {
return inpack.handle
}
func (inpack *inPack) UniqueId() string {
return inpack.uniqueId
}
func (inpack *inPack) Err() error {
if inpack.dataType == rt.PT_Error {
return getError(inpack.data)
}
return nil
}
// Send some datas to client.
// Using this in a job's executing.
func (inpack *inPack) SendData(data []byte) {
outpack := getOutPack()
outpack.dataType = rt.PT_WorkData
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = rt.NewBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.Write(outpack)
}
func (inpack *inPack) SendWarning(data []byte) {
outpack := getOutPack()
outpack.dataType = rt.PT_WorkWarning
hl := len(inpack.handle)
l := hl + len(data) + 1
outpack.data = rt.NewBuffer(l)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], data)
inpack.a.Write(outpack)
}
// Update status.
// Tall client how many percent job has been executed.
func (inpack *inPack) UpdateStatus(numerator, denominator int) {
n := []byte(strconv.Itoa(numerator))
d := []byte(strconv.Itoa(denominator))
outpack := getOutPack()
outpack.dataType = rt.PT_WorkStatus
hl := len(inpack.handle)
nl := len(n)
dl := len(d)
outpack.data = rt.NewBuffer(hl + nl + dl + 2)
copy(outpack.data, []byte(inpack.handle))
copy(outpack.data[hl+1:], n)
copy(outpack.data[hl+nl+2:], d)
inpack.a.Write(outpack)
}
// Decode job from byte slice
func decodeInPack(data []byte) (inpack *inPack, l int, err error) {
if len(data) < rt.MinPacketLength { // valid package should not less 12 bytes
err = fmt.Errorf("Invalid data: %v", data)
return
}
dl := int(binary.BigEndian.Uint32(data[8:12]))
if len(data) < (dl + rt.MinPacketLength) {
err = fmt.Errorf("Not enough data: %v", data)
return
}
dt := data[rt.MinPacketLength : dl+rt.MinPacketLength]
if len(dt) != int(dl) { // length not equal
err = fmt.Errorf("Invalid data: %v", data)
return
}
inpack = getInPack()
inpack.dataType, err = rt.NewPT(binary.BigEndian.Uint32(data[4:8]))
if err != nil {
return
}
switch inpack.dataType {
case rt.PT_JobAssign:
s := bytes.SplitN(dt, []byte{'\x00'}, 3)
if len(s) == 3 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.data = s[2]
}
case rt.PT_JobAssignUniq:
s := bytes.SplitN(dt, []byte{'\x00'}, 4)
if len(s) == 4 {
inpack.handle = string(s[0])
inpack.fn = string(s[1])
inpack.uniqueId = string(s[2])
inpack.data = s[3]
}
default:
inpack.data = dt
}
l = dl + rt.MinPacketLength
return
}