-
Notifications
You must be signed in to change notification settings - Fork 62
/
Copy pathjson.lua
89 lines (74 loc) · 2.42 KB
/
json.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
---
-- Sink a signal to a file, serialized in JSON. Samples are serialized
-- individually and newline delimited. This sink accepts any data type that
-- implements `to_json()`.
--
-- @category Sinks
-- @block JSONSink
-- @tparam[opt=io.stdout] string|file|int file Filename, file object, or file descriptor
--
-- @signature in:supported >
--
-- @usage
-- -- Sink JSON serialized samples to stdout
-- local snk = radio.JSONSink()
-- top:connect(src, snk)
--
-- -- Sink JSON serialized samples to a file
-- local snk = radio.JSONSink('out.json')
-- top:connect(src, snk)
local ffi = require('ffi')
local block = require('radio.core.block')
local JSONSink = block.factory("JSONSink")
function JSONSink:instantiate(file)
-- Default to io.stdout
self.file = file or io.stdout
-- Accept all input types that implement to_json()
self:add_type_signature({block.Input("in", function (type) return type.to_json ~= nil end)}, {})
end
function JSONSink:initialize()
if type(self.file) == "number" then
-- file descriptor
self.file = ffi.C.fdopen(self.file, "wb")
if self.file == nil then
error("fdopen(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
elseif type(self.file) == "string" then
-- path
self.file = ffi.C.fopen(self.file, "wb")
if self.file == nil then
error("fopen(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
end
-- Save file descriptor
self.fd = ffi.C.fileno(self.file)
-- Register open file
self.files[self.file] = true
end
function JSONSink:process(x)
-- Lock file
if ffi.C.flock(self.fd, ffi.C.LOCK_EX) ~= 0 then
error("flock(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
for i = 0, x.length-1 do
local s = x.data[i]:to_json() .. "\n"
-- Write to file
if ffi.C.fwrite(s, 1, #s, self.file) ~= #s then
error("fwrite(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
end
-- Flush file
if ffi.C.fflush(self.file) ~= 0 then
error("fflush(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
-- Unlock file
if ffi.C.flock(self.fd, ffi.C.LOCK_UN) ~= 0 then
error("flock(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
end
function JSONSink:cleanup()
if ffi.C.fclose(self.file) ~= 0 then
error("fclose(): " .. ffi.string(ffi.C.strerror(ffi.errno())))
end
end
return JSONSink