Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
NickMcSweeney committed Jun 19, 2023
1 parent ca86529 commit eb1c7d9
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 40 deletions.
11 changes: 3 additions & 8 deletions src/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,15 @@ struct User
end

"""
Client(on_msg::Function[, ping_timeout::UInt64])
Client([ping_timeout::UInt64])
A mutable struct that represents an MQTT client.
# Arguments
- `on_msg::Function`: A function that will be called when a message is received.
- `ping_timeout::UInt64`: The number of seconds to wait for a ping response before disconnecting. Default is 60.
# Fields
- `on_msg::Function`: The function that will be called when a message is received.
- `on_msg::Dict{String,Function}`: A dictionary of functions that will be called when a message is received.
- `keep_alive::UInt16`: The number of seconds between pings.
- `last_id::UInt16`: The last message ID used.
- `in_flight::Dict{UInt16, Future}`: A dictionary of messages that are waiting for a response.
Expand All @@ -164,11 +163,7 @@ A mutable struct that represents an MQTT client.
# Examples
```julia
julia> client = Client(on_msg)
Client(on_msg, 60)
julia> client = Client(on_msg, 30)
Client(on_msg, 30)
julia> client = Client()
```
"""
mutable struct Client
Expand Down
71 changes: 39 additions & 32 deletions src/interface.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
"""
MQTTConnection(on_msg::Function)
MQTTConnection(;ping_timeout=UInt64(60))
Create a new `Client` object with the specified `on_msg` function.
Create a new `Client` object with the specified `ping_timeout` (optional).
# Arguments
- `on_msg::Function`: The function to be called when a message is received.
# Keyword Arguments
- `ping_timeout::UInt64=60`: The number of seconds to wait for a ping response before disconnecting. Default is 60.
# Examples
```julia
client = MQTTConnection(on_msg)
client = MQTTConnection()
client = MQTTConnection(ping_timeout=30)
```
"""
MQTTConnection(;ping_timeout=UInt64(60)) = Client(ping_timeout)
Expand Down Expand Up @@ -138,6 +139,25 @@ function disconnect(client::Client)
#wait(client.socket.closenotify)
end

"""
subscribe_async(client::Client, topic::String, on_msg::Function; qos::UInt8=QOS_0)
Subscribe to a topic asynchronously.
# Arguments
- `client::Client`: The MQTT client.
- `topic::String`: The topic to subscribe to.
- `on_msg::Function`: The function to call when a message is received on the topic.
- `qos::UInt8`: The quality of service level to use for the subscription. Default is 0.
# Returns
- `Future`: A future that can be used to wait for the subscription to complete.
# Examples
```julia
future = subscribe_async(client, "my/topic", on_msg, qos=QOS_2)
```
"""
function subscribe_async(client, topic, on_msg; qos=QOS_0)
future = Future()
id = packet_id(client)
Expand All @@ -147,43 +167,30 @@ function subscribe_async(client, topic, on_msg; qos=QOS_0)
return future
end

subscribe(client, topic, on_msg; qos=QOS_0) = resolve(subscribe_async(client, topic, on_msg, qos=qos))

"""
subscribe_async(client::Client, topics::Tuple{String, QOS}...)
subscribe(client::Client, topic::String, on_msg::Function; qos::UInt8=QOS_0)
Subscribes the `Client` instance to the supplied topic tuples.
Returns a `Future` object that contains the actually received QOS levels for each topic on success. Contains an exception on failure
"""
# function subscribe_async(client::Client, topics::Tuple{String, QOS}...)
# future = Future()
# id = packet_id(client)
# client.in_flight[id] = future
# topic_data = []
# for t in topics
# for data in t
# push!(topic_data, data)
# end
# end
# write_packet(client, SUBSCRIBE | 0x02, id, topic_data...)
# return future
# end
Subscribe to a topic.
"""
subscribe(client::Client, topics::Tuple{String, QOS}...)
# Arguments
- `client::Client`: The MQTT client.
- `topic::String`: The topic to subscribe to.
- `on_msg::Function`: The function to call when a message is received on the topic.
- `qos::UInt8`: The quality of service level to use for the subscription. Default is 0.
Waits until the subscribe is fully acknowledged. Returns the actually received QOS levels for each topic on success.
Contains an exception on failure.
# Examples
```julia
subscribe(client, "my/topic", on_msg)
```
"""
# function subscribe(client::Client, topics::Tuple{String, QOS}...)
# v = fetch(subscribe_async(client, topics...))
# v
# end
subscribe(client, topic, on_msg; qos=QOS_0) = resolve(subscribe_async(client, topic, on_msg, qos=qos))


"""
unsubscribe_async(client::Client, topics::String...)
Unsubscribes the `Client` instance from the supplied topic names.
Deletes the callback from the client
Returns a `Future` object that contains `nothing` on success and an exception on failure.
"""
function unsubscribe_async(client::Client, topics::String...)
Expand Down
55 changes: 55 additions & 0 deletions test/smoke.jl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,61 @@ try
@test take!(client_test_res)
end
end

@testset "stress test" begin
println("Running stress tests")

condition = Condition()
topic1 = "foo"
topic2 = "bar"
topic3 = Random.randstring(4)
topic4 = Random.randstring(8)
topic5 = Random.randstring(16)
topic6 = Random.randstring(32)
payload = Random.randstring(64)
client_test_res = Channel{Tuple}(512)

function on_msg(t, p)
msg = p |> String
# println("Received message topic: [", t, "] payload: [", msg, "]")
put!(client_test_res, (t,msg))
end

client = Client()

connect(client, MQTT_BROKER, MQTT_PORT)
sleep(0.5)

@time subscribe(client, topic1, on_msg, qos=QOS_2)
@time subscribe(client, topic2, on_msg, qos=QOS_2)
@time subscribe(client, topic3, on_msg, qos=QOS_2)
@time subscribe(client, topic4, on_msg, qos=QOS_2)
@time subscribe(client, topic5, on_msg, qos=QOS_2)
@time subscribe(client, topic6, on_msg, qos=QOS_2)

@time for i in 1:256
if i%6 == 1
publish_async(client, topic1, payload)
elseif i%6 == 2
publish_async(client, topic2, payload)
elseif i%6 == 3
publish_async(client, topic3, payload)
elseif i%6 == 4
publish_async(client, topic4, payload)
elseif i%6 == 5
publish_async(client, topic5, payload)
else
publish_async(client, topic6, payload)
end
end
count = 0
@time while count < 256
wait(client_test_res)
t,p = take!(client_test_res)
@test p == payload
count += 1
end
end
catch e
println("$MQTT_BROKER:$MQTT_PORT not online -- skipping smoke test")
@error e
Expand Down

0 comments on commit eb1c7d9

Please sign in to comment.