alog
                                
                                
                                
                                    alog copied to clipboard
                            
                            
                            
                        All - Ecto.Adapters.SQL.Connection
Part of #38
https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Connection.html#c:all/1
We need to implement this callback so that it returns the latest version of all rows that match the query parameter.
@RobStallion has already done some research on looking into how we can modify the passed in query, which we can then pass on to the Ecto.Adapters.Postgres.Connection.all function
https://github.com/RobStallion/alog_adapter/issues/1 https://stackoverflow.com/questions/54690701/is-there-a-way-to-ensure-where-clause-happens-after-distinct
@Danwhy I just got the following error when trying to run a rollback command on the module where I have been testing this dummy adapter...
** (Postgrex.Error) ERROR 42703 (undefined_column) column s0.comment_id_no does not exist
    query: SELECT DISTINCT ON (s0."comment_id_no") s0."version"::bigint FROM "schema_migrations" AS s0 FOR UPDATE
    (ecto_sql) lib/ecto/adapters/sql.ex:624: Ecto.Adapters.SQL.raise_sql_call_error/1
    (ecto_sql) lib/ecto/adapters/sql.ex:557: Ecto.Adapters.SQL.execute/5
    (ecto) lib/ecto/repo/queryable.ex:147: Ecto.Repo.Queryable.execute/4
    (ecto) lib/ecto/repo/queryable.ex:18: Ecto.Repo.Queryable.all/3
    (ecto_sql) lib/ecto/migrator.ex:316: anonymous fn/3 in Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/ecto/adapters/sql.ex:820: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection) lib/db_connection.ex:1355: DBConnection.run_transaction/4
    (ecto_sql) lib/ecto/adapters/sql.ex:727: Ecto.Adapters.SQL.lock_for_migrations/5
    (ecto_sql) lib/ecto/migrator.ex:318: Ecto.Migrator.lock_for_migrations/3
    (ecto_sql) lib/mix/tasks/ecto.rollback.ex:106: anonymous fn/4 in Mix.Tasks.Ecto.Rollback.run/2
    (elixir) lib/enum.ex:765: Enum."-each/2-lists^foreach/1-0-"/2
    (elixir) lib/enum.ex:765: Enum.each/2
    (mix) lib/mix/task.ex:316: Mix.Task.run_task/3
    (mix) lib/mix/cli.ex:79: Mix.CLI.run_task/2
    (elixir) lib/code.ex:767: Code.require_file/2
This is the line that is causing the issue - https://github.com/RobStallion/alog_adapter/blob/master/lib/connection.ex#L36
This appears to be because not all the tables created in the module contain the row "comment_id_no" (equivalent to entry_id).
This could be a potential issue in trying to create an ecto adapter.
@RobStallion Yeah I'd assume that the problem is that the schema_migrations table is automatically created. But we can have full control over all tables being created, so we can either make sure the "entry_id" is on all created tables, or have a clause that catches the exceptions.
I've been having a look into the flow of how the Repo.all query is called:
Repo.all -> Ecto.Repo.Queryable.all -> Ecto.Repo.Queryable.execute -> Ecto.Query.Planner.query (Ecto.Query.Planner.plan) -> adapter.execute (Ecto.Adapters.SQL.execute) -> adapter.execute! -> sql_call
It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function
I've been doing something similar yesterday to see how the all function was created. I was a bit confuse by the execute function as the adapter has also one but with only 4 parameters and I figure out later one that it was the one used by Ecto.Adapters.SQL which is defined by 5 parameters.
see also https://github.com/dwyl/learn-elixir/issues/116 to see what is a best way to have a trace of the function call
The sources looks to be used (but not defined there) in Ecto.Adapters.SQL.execute function with put_source:
  def execute(adapter_meta, query_meta, prepared, params, opts) do
    %{num_rows: num, rows: rows} =
      execute!(adapter_meta, prepared, params, put_source(opts, query_meta))
    {num, rows}
end
https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L554-L560
I think by the time it gets to that function, our Adapter all function has already been called, that's just where the sql it generates is actually executed
Describing the trace call for the Repo.all function to find out where exactly the all function from our adapter will be called and with which parameter structure
Repo.all(queryalbe, opts):
This function is defined in Ecto.Repo module on line 237: https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/repo.ex#L237-L239
 def all(queryable, opts \\ []) do
          Ecto.Repo.Queryable.all(__MODULE__, queryable, opts)
end
- `Ecto.Repo.Queryable.all(name, queryable, opts) defined on L11 https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/repo/queryable.ex#L11
 
  def all(name, queryable, opts) when is_list(opts) do
    query =
      queryable
      |> Ecto.Queryable.to_query
      |> Ecto.Query.Planner.ensure_select(true)
      |> attach_prefix(opts)
    execute(:all, name, query, opts) |> elem(1)
end
This function prepare the query which will be passed to the execute function.
Ecto.Queryable.to_query(data)"Converts the given data into an Ecto.Query" see doc https://hexdocs.pm/ecto/Ecto.Queryable.html#to_query/1 and code https://github.com/elixir-ecto/ecto/blob/v3.0.7/lib/ecto/queryable.ex#L9 Depending of the type of the parameter/data theto_queryfunction will crate the corresponding Struct query:
Ecto.Query.Planner.ensure_select(true)The Ecto.Query.Planner " Normalizes a query and its parameters.". The ensure_select function is defined here : https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/query/planner.ex#L738-L759 This function is "Used for customizing the query returning result." and in our case the second argument beingtrue: ensure_select(query, true) will update the struct query to define the select value:
  def ensure_select(%{select: nil} = query, true) do
    %{query | select: %SelectExpr{expr: {:&, [], [0]}, line: __ENV__.line, file: __ENV__.file}}
end
attach_prefix(query, opts), The attach_prefix function is a helper defined inEcto.Repo.Queryalbeas
 defp attach_prefix(query, opts) do
    case Keyword.fetch(opts, :prefix) do
      {:ok, prefix} -> %{query | prefix: prefix}
      :error -> query
    end
end
It adds the prefix defined in the options (options are keywords ie a list of tuple [{otp1: true}, {opt2: "test"}, ...] to the query.
execute(:all, name, query, opts) |> elem(1)The last function call where the name isEcto.Repowhich is defined in the Ecto.Repo.all function with MODULEEcto.Repo.Queryable.all(__MODULE__, queryable, opts). Theexecutefunction is defined in L130 in Ecto.Repo.Queryable as https://github.com/elixir-ecto/ecto/blob/2d564df57d29ef021f564af36e4b3ab86f902554/lib/ecto/repo/queryable.ex#L130-L155
  defp execute(operation, name, query, opts) when is_list(opts) do
    {adapter, %{cache: cache} = adapter_meta} = Ecto.Repo.Registry.lookup(name)
    {query_meta, prepared, params} = Planner.query(query, operation, cache, adapter, 0)
    case query_meta do
      %{select: nil} ->
        adapter.execute(adapter_meta, query_meta, prepared, params, opts)
      %{select: select, sources: sources, preloads: preloads} ->
        %{
          preprocess: preprocess,
          postprocess: postprocess,
          take: take,
          assocs: assocs,
          from: from
        } = select
        preprocessor = preprocessor(from, preprocess, adapter)
        {count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)
        postprocessor = postprocessor(from, postprocess, take, adapter)
        {count,
          rows
          |> Ecto.Repo.Assoc.query(assocs, sources, preprocessor)
          |> Ecto.Repo.Preloader.query(name, preloads, take, postprocessor, opts)}
    end
end
Ecto.Repo.Registry.lookup(name)is checking that the repo exists:
  def lookup(repo) when is_atom(repo) do
    GenServer.whereis(repo) # return process id of the repo or return an error if not find (line below)
    |> Kernel.||(raise "could not lookup #{inspect repo} because it was not started or it does not exist")
    |> lookup()
end
  # use ets to retreive the adapter associated with the process id of the repo
  def lookup(pid) when is_pid(pid) do
    :ets.lookup_element(__MODULE__, pid, 3)
end
Planner.query(query, operation, cache, adapter, 0)define at https://github.com/elixir-ecto/ecto/blob/d2bca3d36476cc92d2e761ab2d99c130c9ad83d5/lib/ecto/query/planner.ex#L127
  def query(query, operation, cache, adapter, counter) do
    {query, params, key} = plan(query, operation, adapter, counter)
    query_with_cache(key, query, operation, cache, adapter, counter, params)
end
The Ecto.Query.Planner.query function is calling Ecto.Query.Planner.plan function https://github.com/elixir-ecto/ecto/blob/d2bca3d36476cc92d2e761ab2d99c130c9ad83d5/lib/ecto/query/planner.ex#L206
  @doc """
  Prepares the query for cache.
  This means all the parameters from query expressions are
  merged into a single value and their entries are pruned
  from the query.
  This function is called by the backend before invoking
  any cache mechanism.
  """
  def plan(query, operation, adapter, counter) do
    query
    |> plan_sources(adapter)
    |> plan_assocs
    |> plan_combinations(operation, adapter, counter)
    |> plan_cache(operation, adapter, counter)
  rescue
    e ->
      # Reraise errors so we ignore the planner inner stacktrace
      filter_and_reraise e, System.stacktrace
end
So I think the plan function is just preparing the query.
then query_with_cache(key, query, operation, cache, adapter, counter, params) is called:
  defp query_with_cache(key, query, operation, cache, adapter, counter, params) do
    case query_lookup(key, query, operation, cache, adapter, counter) do
      {_, select, prepared} ->
        {build_meta(query, select), {:nocache, prepared}, params}
      {_key, :cached, select, cached} ->
        update = &cache_update(cache, key, &1)
        reset = &cache_reset(cache, key, &1)
        {build_meta(query, select), {:cached, update, reset, cached}, params}
      {_key, :cache, select, prepared} ->
        update = &cache_update(cache, key, &1)
        {build_meta(query, select), {:cache, update, prepared}, params}
    end
end
which calls query_lookup(key, query, operation, cache, adapter, counter). This function looks if the query already exists in the ets cache otherwise it will prepare the query
  defp query_lookup(key, query, operation, cache, adapter, counter) do
    case :ets.lookup(cache, key) do
      [term] -> term
      [] -> query_prepare(query, operation, adapter, counter, cache, key)
    end
end
Then query_prepare(query, operation, adapter, counter, cache, key):
  defp query_prepare(query, operation, adapter, counter, cache, key) do
    case query_without_cache(query, operation, adapter, counter) do
      {:cache, select, prepared} ->
        cache_insert(cache, key, {key, :cache, select, prepared})
      {:nocache, _, _} = nocache ->
        nocache
    end
end
Which runs query_without_cache(query, operation, adapter, counter):
  defp query_without_cache(query, operation, adapter, counter) do
    {query, select} = normalize(query, operation, adapter, counter)
    {cache, prepared} = adapter.prepare(operation, query)
    {cache, select, prepared}
end
And we can see that after the query has been normalised the adapter.prepare(operation, query) function is called. This function is defined in Ecto.Adapters.SQL https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/sql.ex#L134
 def prepare(:all, query) do
        {:cache, {System.unique_integer([:positive]), IO.iodata_to_binary(@conn.all(query))}}
end
we can see that @conn.all(query) is called and conn is defined as @conn __MODULE__.Connection which is our Connection adapter module!
I think I manage to see the lifecycle of the Repo.all function. There are other helpers functions that are called but by doing this trace I understood that the query is first "prepared" then checked if it already exists in ets and if not then the Adapter will be called and the correct function will be triggered. Then postgrex will have the responsability to send the query directly to Postgres and to retrieve the result. Now that we have a better idea where the query is transformed we can tried to copy or adapt some helper functions above to add some sql clause for adding the alog logic (uuid unique, latest timestamp,..)
Trying to recreate a similar subquery to the one currently on alog:
    sub = from (m in subquery(a1)), distinct: m.last_name, order_by: m.first_name, select: m
    query = from a in subquery(sub), where: not a.deleted, select: a
    Ecto.Adapters.Postgres.Connection.all(query)
where a1 is the query the adapter all function is getting

The create_names function here https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/postgres/connection.ex#L654
    defp create_names(%{sources: sources}) do
      create_names(sources, 0, tuple_size(sources)) |> List.to_tuple()
end
The error is from tuple_size(sources) where it seems that sources is nil
I think that the subquery function returns a struct query which is different that the one after the initial steps of the adapter which makes them incompatible
It looks like the sources are added in the Ecto.Query.Planner.plan call, so we may be able to call this function ourselves to prepare the query in our all function
Looking if we can reuse this function
Trying to apply plan or plan_sources from Ecto.Query.Planner returns other errors.
I will instead
- 
understand how the Ecto.Query type/struct is created, see https://hexdocs.pm/ecto/Ecto.Query.html#t:t/0

 - 
Then try to replicate the logic of the Postgres adapter and add the missing element we need for alog, see https://github.com/elixir-ecto/ecto_sql/blob/3a72eb111f8613c8eada296bcd3dd89883a52b5a/lib/ecto/adapters/postgres/connection.ex#L106-L124
 
Looking at @SimonLab 's comment, the following line appears to be where the adapter is called...
{count, rows} = adapter.execute(adapter_meta, query_meta, prepared, params, opts)
This calls the following function in the adapter...
def execute(adapter_meta, query_meta, query, params, opts) do
  Ecto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts)
end
This function is defined here.
The arguments passed to this function are...
[
  adapter_meta: %{
    cache: #Reference<0.3974704928.2992242689.138205>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.2848.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {134695,
    "SELECT c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]
To clarify, the original call was
Repo.all(Comments)
                                    
                                    
                                    
                                
Next step
I am going to pass in a subquery to Repo.all and compare the two sets or arguments that are passed to the adapters execute/5 function.
[
  adapter_meta: %{
    cache: #Reference<0.872129942.2461401090.223629>,
    opts: [timeout: 15000, pool_size: 10, pool: DBConnection.ConnectionPool],
    pid: #PID<0.329.0>,
    sql: AlogAdapter.Connection,
    telemetry: {UsingAlogAdapter.Repo, :debug, [],
     [:using_alog_adapter, :repo, :query]}
  },
  opts: [],
  params: [],
  prepared: {:cache,
   #Function<29.104601620/1 in Ecto.Query.Planner.query_with_cache/7>,
   {103,
    "SELECT DISTINCT ON (c0.\"cid\") c0.\"id\", c0.\"comment\", c0.\"comment_id_no\", c0.\"show\", c0.\"cid\", c0.\"entry_id\", c0.\"inserted_at\", c0.\"updated_at\" FROM \"comments\" AS c0 WHERE (c0.\"comment_id_no\" = '1') ORDER BY c0.\"cid\", c0.\"inserted_at\" DESC"}},
  query_meta: %{
    preloads: [],
    select: %{
      assocs: [],
      from: {:any,
       {:source, {"comments", UsingAlogAdapter.Comments}, nil,
        [
          id: :id,
          comment: :string,
          comment_id_no: :string,
          show: :boolean,
          cid: :string,
          entry_id: :string,
          inserted_at: :naive_datetime,
          updated_at: :naive_datetime
        ]}},
      postprocess: {:source, :from},
      preprocess: [source: :from],
      take: []
    },
    sources: {{"comments", UsingAlogAdapter.Comments, nil}}
  }
]
These are the arguments passed to the adapters execute/5 function when all is called with the following subquery...
sub =
  from(c in Comments,
  distinct: c.cid,
  order_by: [desc: :inserted_at]
)
query = from(c in sub, where: c.comment_id_no == "1")
Repo.all(query)
                                    
                                    
                                    
                                
At first glance the arguments passed to our adapter look almost identical, with the OBVIOUS exception of the :prepared atom.
:preparedis the same as:queryI just logged the arguments from the function thatEcto.Adapters.SQL.execute(adapter_meta, query_meta, query, params, opts)calls. So I loggeddef execute(adapter_meta, query_meta, prepared, params, opts)this line. This is the only reason the function says:querybut the log says:prepared.
Hopefully this means that if we can somehow update the query that is passed to the adapter, to add subquery to it.
Will look into possible approaches for this.
My latest attempt was to try to reproduce the logic of the all function from the Postgres adapter:
    def all(query) do
      sources = create_names(query)
      {select_distinct, order_by_distinct} = distinct(query.distinct, sources, query)
      from = from(query, sources)
      select = select(query, select_distinct, sources)
      join = join(query, sources)
      where = where(query, sources)
      group_by = group_by(query, sources)
      having = having(query, sources)
      window = window(query, sources)
      combinations = combinations(query)
      order_by = order_by(query, order_by_distinct, sources)
      limit = limit(query, sources)
      offset = offset(query, sources)
      lock = lock(query.lock)
      [select, from, join, where, group_by, having, window, combinations, order_by, limit, offset | lock]
end
This function returns an improper list of improper list of string. I'm not certain why exactly improper lists are used here but I think this might be for optimisation and to make pattern matching easier.
see https://hexdocs.pm/elixir/master/List.html "Some lists, called improper lists, do not have an empty list as the second element in the last cons cell"
Then I've looked at the private function from to understand a bit the structure of the returned value of all:
    defp from(%{from: %{source: source}} = query, sources) do
      {from, name} = get_source(query, sources, 0, source)
      [" FROM ", from, " AS " | name]
end
So it looks like it create an improper list which is similar to the from part of an sql query.
My idea is to create/transform one of the improper list to be able to add a subquery.
I haven't manage to get to this point yet. An idea would be to IO.inspect in the all function of the normal Postgres adapter a query containing a subquery to try to understand how it's formed. Then try to reproduce the logic with our specific alog subquery.
Then try to repeat this process for the distinct part of the alog query.
                                    
                                    
                                    
                                
The code that will enable access to the all query can be found here.
At the moment I am unsure if this function is only called when Repo.all is called or if it is called for other Repo functions as well.
Running the tests will run the migrations which will run Repo.all. At the moment we are adding  distinct on on all the queries but I don't think it is needed for the migration:

To be able to get all the different parts of the query (ie table name, fields, table_as...) I was using String.split but the code become tedious and not really readable, I'm going to look at how to replace it with regex. This will allow me to get the table name and create a switch to know if Repo.all is run on a migration or on a user query
Given a query similar to: we can retrieve the different part of the query with the following regex:
Regex.named_captures(~r/(\bSELECT\b)\s(?<fields>.*)\sFROM\s(?<table_name>.*)\sas\s(?<table_as>.*)(?<rest_query>.*)/i, query)
- the 
?<name>give a name to the element we try to match - the `\bmath\b is to add boundary to our match, for example we want to match "select" only and not "selected"
 - at the end of the regex the 
/iis for case insensitive, it looks like Ecto is using upper case to create the query but to be sure we can create the regex caseless. 
I think the alog all query is now ready:

I've recreated the following structure for the query:
# SELECT
#   s0."id",
#   s0."name",
#   s0."entry_id",
#   s0."deleted",
#   s0."inserted_at",
#   s0."updated_at"
# FROM
#   (SELECT DISTINCT ON (d0."entry_id")
#       d0."id" AS "id"
#     , d0."name" AS "name"
#     , d0."entry_id" AS "entry_id"
#     , d0."deleted" AS "deleted"
#     , d0."inserted_at" AS "inserted_at"
#     , d0."updated_at" AS "updated_at"
#   FROM "drink_types" AS d0
#   ORDER BY d0."entry_id", d0."updated_at" DESC)
# AS s0 WHERE (NOT (s0."deleted"))
I will need to write more tests to see if any error occurs.