Skip to content

Commit 6ef1875

Browse files
committed
Add Redix.PubSub.ping/2
Closes #274 (because it supersedes it).
1 parent 7a538ce commit 6ef1875

3 files changed

Lines changed: 123 additions & 1 deletion

File tree

lib/redix/pubsub.ex

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ defmodule Redix.PubSub do
180180
@type subscriber() :: pid() | port() | atom() | {atom(), node()}
181181
@type connection() :: GenServer.server()
182182

183+
alias Redix.ConnectionError
183184
alias Redix.StartOptions
184185

185186
@doc """
@@ -329,6 +330,33 @@ defmodule Redix.PubSub do
329330
:gen_statem.call(conn, {:subscribe, List.wrap(channels), subscriber})
330331
end
331332

333+
@doc """
334+
Sends a `PING` to the Redis server and waits for a `PONG` reply.
335+
336+
Upon a successful reply from the server, returns `:ok`. If no reply is received
337+
within the given timeout, returns `:error`.
338+
339+
This is useful for periodic health checks of the underlying connection. `PING` is
340+
one of the few commands that Redis allows on a pub/sub connection.
341+
342+
## Examples
343+
344+
iex> Redix.PubSub.ping(pubsub)
345+
:ok
346+
347+
iex> Redix.PubSub.ping(pubsub, 1000)
348+
:ok
349+
350+
"""
351+
@doc since: "1.6.0"
352+
@spec ping(connection(), timeout()) :: :ok | {:error, ConnectionError.t()}
353+
def ping(conn, timeout \\ 5000)
354+
when (is_integer(timeout) and timeout >= 0) or timeout == :infinity do
355+
:gen_statem.call(conn, :ping, timeout)
356+
catch
357+
:exit, {:timeout, _} -> {:error, %ConnectionError{reason: :timeout}}
358+
end
359+
332360
@doc """
333361
Subscribes `subscriber` to the given pattern or list of patterns.
334362

lib/redix/pubsub/connection.ex

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ defmodule Redix.PubSub.Connection do
1515
:connected_address,
1616
:client_id,
1717
subscriptions: %{},
18-
monitors: %{}
18+
monitors: %{},
19+
ping_callers: :queue.new()
1920
]
2021

2122
@backoff_exponent 1.5
@@ -187,6 +188,10 @@ defmodule Redix.PubSub.Connection do
187188
{:keep_state, data}
188189
end
189190

191+
def disconnected({:call, from}, :ping, _data) do
192+
{:keep_state_and_data, {:reply, from, {:error, %ConnectionError{reason: :closed}}}}
193+
end
194+
190195
def disconnected({:call, from}, :get_client_id, _data) do
191196
reply = {:error, %ConnectionError{reason: :closed}}
192197
{:keep_state_and_data, {:reply, from, reply}}
@@ -234,6 +239,18 @@ defmodule Redix.PubSub.Connection do
234239
end
235240
end
236241

242+
def connected({:call, from}, :ping, data) do
243+
data = %{data | ping_callers: :queue.in(from, data.ping_callers)}
244+
245+
case data.transport.send(data.socket, Protocol.pack(["PING"])) do
246+
:ok ->
247+
{:keep_state, data}
248+
249+
{:error, reason} ->
250+
disconnect(data, reason, _handle_disconnection? = true)
251+
end
252+
end
253+
237254
def connected({:call, from}, :get_client_id, data) do
238255
reply =
239256
if id = data.client_id do
@@ -345,6 +362,13 @@ defmodule Redix.PubSub.Connection do
345362
end
346363
end
347364

365+
# When a Redis connection is in pub/sub mode (subscribed to at least one
366+
# channel), Redis wraps PING responses as pub/sub messages: ["pong", ""]. When
367+
# not subscribed to anything, Redis returns a plain +PONG\r\n simple string,
368+
# which parses to "PONG".
369+
defp handle_pubsub_msg(data, ["pong", _message]), do: handle_pong(data)
370+
defp handle_pubsub_msg(data, "PONG"), do: handle_pong(data)
371+
348372
defp handle_pubsub_msg(data, ["message", channel, payload]) do
349373
properties = %{channel: channel, payload: payload}
350374
handle_pubsub_message_with_payload(data, {:channel, channel}, :message, properties)
@@ -369,6 +393,31 @@ defmodule Redix.PubSub.Connection do
369393
{:ok, data}
370394
end
371395

396+
# Ping handling.
397+
398+
defp handle_pong(data) do
399+
{popped, data} = get_and_update_in(data.ping_callers, &:queue.out/1)
400+
401+
# :empty is not supposed to happen but we're not gonna crash the process
402+
# if it does.
403+
case popped do
404+
{:value, from} -> :gen_statem.reply(from, :ok)
405+
:empty -> :ok
406+
end
407+
408+
{:ok, data}
409+
end
410+
411+
defp drain_ping_callers(data) do
412+
error = {:error, %ConnectionError{reason: :closed}}
413+
414+
data.ping_callers
415+
|> :queue.to_list()
416+
|> Enum.each(&:gen_statem.reply(&1, error))
417+
418+
%{data | ping_callers: :queue.new()}
419+
end
420+
372421
# Subscribing.
373422

374423
defp subscribe_pid_to_targets(data, operation, targets, pid) do
@@ -622,6 +671,7 @@ defmodule Redix.PubSub.Connection do
622671
end
623672

624673
def disconnect(data, reason, handle_disconnection?) do
674+
data = drain_ping_callers(data)
625675
{next_backoff, data} = next_backoff(data)
626676

627677
if data.socket do

test/redix/pubsub_test.exs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,50 @@ defmodule Redix.PubSubTest do
365365
refute new_state.continuation
366366
end
367367

368+
describe "ping/2" do
369+
test "returns :ok when connected", %{pubsub: pubsub} do
370+
assert PubSub.ping(pubsub) == :ok
371+
assert PubSub.ping(pubsub, _with_custom_timeout = 10_000) == :ok
372+
assert PubSub.ping(pubsub, _with_custom_timeout = :infinity) == :ok
373+
end
374+
375+
test "returns :ok when subscribed to channels", %{pubsub: pubsub} do
376+
assert {:ok, ref} = PubSub.subscribe(pubsub, "foo", self())
377+
assert_receive {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "foo"}}
378+
assert PubSub.ping(pubsub) == :ok
379+
end
380+
381+
test "returns :error when disconnected" do
382+
{:ok, pubsub} = PubSub.start_link(port: 9999)
383+
assert PubSub.ping(pubsub) == {:error, %ConnectionError{reason: :closed}}
384+
end
385+
386+
@tag :capture_log
387+
test "returns :error when connection goes down", %{pubsub: pubsub, conn: conn} do
388+
assert {:ok, ref} = PubSub.subscribe(pubsub, "foo", self())
389+
assert_receive {:redix_pubsub, ^pubsub, ^ref, :subscribed, %{channel: "foo"}}
390+
391+
Redix.command!(conn, ~w(CLIENT KILL TYPE pubsub))
392+
assert_receive {:redix_pubsub, ^pubsub, ^ref, :disconnected, _properties}
393+
394+
assert PubSub.ping(pubsub) == {:error, %ConnectionError{reason: :closed}}
395+
end
396+
397+
test "returns :error when timeout expires", %{pubsub: pubsub} do
398+
assert PubSub.ping(pubsub, 0) == {:error, %ConnectionError{reason: :timeout}}
399+
end
400+
401+
test "multiple concurrent pings return :ok", %{pubsub: pubsub} do
402+
tasks =
403+
for _ <- 1..5 do
404+
Task.async(fn -> PubSub.ping(pubsub) end)
405+
end
406+
407+
results = Task.await_many(tasks)
408+
assert Enum.all?(results, &(&1 == :ok))
409+
end
410+
end
411+
368412
defp wait_until_passes(timeout, fun) when timeout <= 0 do
369413
fun.()
370414
end

0 commit comments

Comments
 (0)