Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ jobs:
with:
repository: upstash/upstash-redis

# The following tests fail because of bugs with Upstash's implementation of Redis, NOT because of our library
# So we remove them from the test suite
- name: Remove JSON tests
run: |
rm ./pkg/commands/json_get.test.ts
rm ./pkg/commands/json_mget.test.ts
rm ./pkg/commands/json_objlen.test.ts

- name: Run @upstash/redis Test Suite
run: deno test -A ./pkg
env:
Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ srh-*.tar

*.iml

srh-config/
srh-config/

test-project/
38 changes: 20 additions & 18 deletions lib/srh/auth/token_resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ defmodule Srh.Auth.TokenResolver do
{srh_max_connections, ""} = Integer.parse(System.get_env("SRH_MAX_CONNECTIONS", "3"))

# Create a config-file-like structure that the ETS layout expects, with just one entry
config_file_data = Map.put(%{}, srh_token, %{
"srh_id" => "env_config_connection", # Jason.parse! expects these keys to be strings, not atoms, so we need to replicate that setup
"connection_string" => srh_connection_string,
"max_connections" => srh_max_connections
})
config_file_data =
Map.put(%{}, srh_token, %{
# Jason.parse! expects these keys to be strings, not atoms, so we need to replicate that setup
"srh_id" => "env_config_connection",
"connection_string" => srh_connection_string,
"max_connections" => srh_max_connections
})

IO.puts("Loaded config from env. #{map_size(config_file_data)} entries.")
# Load this into ETS
Expand All @@ -98,17 +100,17 @@ defmodule Srh.Auth.TokenResolver do
# The env strategy uses the same ETS table as the file strategy, so we can fall back on that
defp do_resolve("env", token), do: do_resolve("file", token)

defp do_resolve("redis", _token) do
{
:ok,
# This is done to replicate what will eventually be API endpoints, so they keys are not atoms
Jason.decode!(
Jason.encode!(%{
srh_id: "1000",
connection_string: "redis://localhost:6379",
max_connections: 10
})
)
}
end
# defp do_resolve("redis", _token) do
# {
# :ok,
# # This is done to replicate what will eventually be API endpoints, so they keys are not atoms
# Jason.decode!(
# Jason.encode!(%{
# srh_id: "1000",
# connection_string: "redis://localhost:6379",
# max_connections: 10
# })
# )
# }
# end
end
10 changes: 8 additions & 2 deletions lib/srh/http/base_router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ defmodule Srh.Http.BaseRouter do
|> get_req_header("upstash-encoding")
|> RequestValidator.validate_encoding_header() do
{:ok, _encoding_enabled} -> true
{:error, _} -> false # it's not required to be present
# it's not required to be present
{:error, _} -> false
end
end

Expand All @@ -63,7 +64,9 @@ defmodule Srh.Http.BaseRouter do
true ->
# We need to use the encoder to
ResultEncoder.encode_response(response)
false -> response

false ->
response
end
end

Expand All @@ -85,6 +88,9 @@ defmodule Srh.Http.BaseRouter do
{:not_authorized, message} ->
Copy link
Contributor

@jahands jahands Aug 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to encode other errors the same way? (Maybe in a separate PR?)
Edit: oops, was 2 minutes too late :)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha, yes, I can do another PR for error overhaul and clean this all up

%{code: 401, message: message, json: false}

{:connection_error, message} ->
%{code: 500, message: Jason.encode!(%{error: message}), json: true}

{:server_error, _} ->
%{code: 500, message: "An error occurred internally", json: false}

Expand Down
108 changes: 74 additions & 34 deletions lib/srh/http/command_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,20 @@ defmodule Srh.Http.CommandHandler do
{:ok, result_map} ->
[result_map | responses]

{:connection_error, result} ->
{:connection_error, result}

{:redis_error, result} ->
[result | responses]
end

dispatch_command_array(rest, connection_info, updated_responses)
case updated_responses do
{:connection_error, result} ->
{:connection_error, result}

_ ->
dispatch_command_array(rest, connection_info, updated_responses)
end
end

defp dispatch_command_array([], _connection_info, responses) do
Expand All @@ -95,43 +104,56 @@ defmodule Srh.Http.CommandHandler do
# Borrow a client, then run all of the commands (wrapped in MULTI and EXEC)
worker_pid = Client.borrow_worker(client_pid)

wrapped_command_array = [["MULTI"] | command_array]
do_dispatch_command_transaction_array(wrapped_command_array, worker_pid, responses)
# We are manually going to invoke the MULTI, because there might be a connection error to the Redis server.
# In that case, we don't want the error to be wound up in the array of errors,
# we instead want to return the error immediately.
case ClientWorker.redis_command(worker_pid, ["MULTI"]) do
{:ok, _} ->
do_dispatch_command_transaction_array(command_array, worker_pid, responses)

# Now manually run the EXEC - this is what contains the information to form the response, not the above
result = case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
{:ok, res} ->
{
:ok,
res
|> Enum.map(&(%{result: &1}))
}
# TODO: Can there be any inline errors here? Wouldn't they fail the whole tx?
# Now manually run the EXEC - this is what contains the information to form the response, not the above
result =
case ClientWorker.redis_command(worker_pid, ["EXEC"]) do
{:ok, res} ->
{
:ok,
res
|> Enum.map(&%{result: &1})
}

{:error, error} ->
decode_error(error, srh_id)
end

Client.return_worker(client_pid, worker_pid)

# Fire back the result here, because the initial Multi was successful
result

{:error, error} ->
{:redis_error, %{error: error.message}}
decode_error(error, srh_id)
end

Client.return_worker(client_pid, worker_pid)

result
{:error, msg} ->
{:server_error, msg}
end
end

defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses) when is_pid(worker_pid) do
updated_responses = case ClientWorker.redis_command(worker_pid, current) do
{:ok, res} ->
[%{result: res} | responses]

{:error, error} ->
[
%{
error: error.message
} | responses
]
end
defp do_dispatch_command_transaction_array([current | rest], worker_pid, responses)
when is_pid(worker_pid) do
updated_responses =
case ClientWorker.redis_command(worker_pid, current) do
{:ok, res} ->
[%{result: res} | responses]

{:error, error} ->
[
%{
error: error.message
}
| responses
]
end

do_dispatch_command_transaction_array(rest, worker_pid, updated_responses)
end
Expand All @@ -154,16 +176,34 @@ defmodule Srh.Http.CommandHandler do
{:ok, %{result: res}}

{:error, error} ->
{
:redis_error,
%{
error: error.message
}
}
decode_error(error, srh_id)
end

{:error, msg} ->
{:server_error, msg}
end
end

# Figure out if it's an actual Redis error or a Redix error
defp decode_error(error, srh_id) do
case error do
%{reason: :closed} ->
IO.puts(
"WARNING: SRH was unable to connect to the Redis server. Please make sure it is running, and the connection information is correct. SRH ID: #{srh_id}"
)

{
:connection_error,
"SRH: Unable to connect to the Redis server. See SRH logs for more information."
}

_ ->
{
:redis_error,
%{
error: error.message
}
}
end
end
end
1 change: 0 additions & 1 deletion lib/srh/http/request_validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ defmodule Srh.Http.RequestValidator do
defp do_validate_encoding_header([first_item | rest]) do
case first_item do
"base64" -> {:ok, true}

_ -> do_validate_encoding_header(rest)
end
end
Expand Down
21 changes: 14 additions & 7 deletions lib/srh/http/result_encoder.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
defmodule Srh.Http.ResultEncoder do

# Authentication errors don't get encoded, we need to skip over those
def encode_response({:not_authorized, message}) do
{:not_authorized, message}
Expand All @@ -10,6 +9,10 @@ defmodule Srh.Http.ResultEncoder do
{:redis_error, error_result_map}
end

def encode_response({:connection_error, error_result_map}) do
{:connection_error, error_result_map}
end

# List-based responses, they will contain multiple entries
# It's important to note that this is DIFFERENT from a list of values,
# as it's a list of separate command responses. Each is a map that either
Expand All @@ -27,12 +30,16 @@ defmodule Srh.Http.ResultEncoder do
## RESULT LIST ENCODING ##

defp encode_response_list([current | rest], encoded_responses) do
encoded_current_entry = case current do
%{result: value} ->
%{result: encode_result_value(value)} # Encode the value
%{error: error_message} ->
%{error: error_message} # We don't encode errors
end
encoded_current_entry =
case current do
%{result: value} ->
# Encode the value
%{result: encode_result_value(value)}

%{error: error_message} ->
# We don't encode errors
%{error: error_message}
end

encode_response_list(rest, [encoded_current_entry | encoded_responses])
end
Expand Down
14 changes: 6 additions & 8 deletions lib/srh/redis/client_registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,9 @@ defmodule Srh.Redis.ClientRegistry do
{:ok, pid},
%{
state_update
|
currently_borrowed_pids:
[pid | state_update.currently_borrowed_pids]
|> Enum.uniq()
| currently_borrowed_pids:
[pid | state_update.currently_borrowed_pids]
|> Enum.uniq()
}
}
end
Expand All @@ -73,10 +72,9 @@ defmodule Srh.Redis.ClientRegistry do
:noreply,
%{
state
|
worker_pids:
[pid | state.worker_pids]
|> Enum.uniq()
| worker_pids:
[pid | state.worker_pids]
|> Enum.uniq()
}
}
end
Expand Down
4 changes: 3 additions & 1 deletion lib/srh/redis/client_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Srh.Redis.ClientWorker do
{:ok, res} ->
{:reply, {:ok, res}, state}

# Both connection errors and Redis command errors will be handled here
{:error, res} ->
{:reply, {:error, res}, state}
end
Expand All @@ -52,7 +53,6 @@ defmodule Srh.Redis.ClientWorker do
{:noreply, state}
end

# TODO: Handle host / port connections
def handle_info(
:create_connection,
%{
Expand All @@ -62,6 +62,8 @@ defmodule Srh.Redis.ClientWorker do
} = state
)
when is_binary(connection_string) do
# NOTE: Redix only seems to open the connection when the first command is sent
# This means that this will return :ok even if the connection string may not actually be connectable
{:ok, pid} = Redix.start_link(connection_string)
{:noreply, %{state | redix_pid: pid}}
end
Expand Down