pgcat icon indicating copy to clipboard operation
pgcat copied to clipboard

The plpgsql funtion write data to some table need force route to primary database

Open mingjunyang opened this issue 2 years ago • 2 comments

Is your feature request related to a problem? Please describe. My plpgsql function need insert data to table, but I think current pgcat did not know my plpgsql function need only route to primary database.

CREATE TABLE write_test_table (
    ID bigserial PRIMARY KEY NOT NULL,
    info1 int,
    info2 int
);

CREATE OR REPLACE FUNCTION test_write_test_table ()
    RETURNS void
    AS $$
DECLARE
    t_count int;
BEGIN
    t_count := 0;
    LOOP
        RAISE NOTICE '%', t_count;
        t_count := t_count + 1;
        INSERT INTO write_test_table (info1, info2)
            VALUES (t_count, t_count * t_count);
        EXIT
        WHEN t_count = 10000;
    END LOOP;
END;
$$
LANGUAGE plpgsql;

my pgcat.toml

[general]
host = "0.0.0.0"
port = 6432

# Number of worker threads the Runtime will use (4 by default).
worker_threads = 8
prepared_statements = true
prepared_statements_cache_size = 500

admin_username = "postgres"
admin_password = "postgres"

[plugins]

[plugins.query_logger]
enabled = true

[pools.demo_db]
pool_mode = "transaction"
default_role = "primary"
query_parser_enabled = true
primary_reads_enabled = true
sharding_function = "pg_bigint_hash"

shards.0 = { servers = [["10.1.1.1", 5432, "primary"],["10.1.1.2", 5432, "replica"]], database = "demo_db"}
users.0 = { username = "postgres", password = "postgres", pool_size = 50 }
 

Describe the solution you'd like Should we create a plugin , detect the query and force some appointed query route to primary database. like those

[plugins.force_to_primary]
enabled = true
querys = [
  "select test_write_test_table()",
  "select test_write_test_table_xxx($1,$2)",
  regex".*\s+test_write_test_table\s+.*",
]

Or, have any implement the same function? Or, I can set some config to got this function?

mingjunyang avatar Jun 20 '23 11:06 mingjunyang

That's an option. Another is to tell PgCat to route the query to the primary:

SET SERVER ROLE TO 'primary';

and then call your function. Once you're done, run:

SET SERVER ROLE TO 'auto';

levkk avatar Jun 20 '23 15:06 levkk

change query with SET SERVER ROLE TO 'primary'; , this need change my client code, it's some difficulties.

Maybe I can implement this code. But I did not know how deal with PluginOutput::OverwriteRouter and

match plugin_result {
    Ok(PluginOutput::Deny(error)) => {
        error_response(&mut self.write, &error).await?;
        continue;
    }

    Ok(PluginOutput::Intercept(result)) => {
        write_all(&mut self.write, result).await?;
        continue;
    }
    Ok(PluginOutput::OverwriteRouter(result)) => {
        query_router.update_role(result);
        continue;
    }

    _ => (),
};
diff --git a/src/client.rs b/src/client.rs
index 6c0d06f..4ab6d71 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -854,6 +854,10 @@ where
                                     write_all(&mut self.write, result).await?;
                                     continue;
                                 }
+                                Ok(PluginOutput::OverwriteRouter(result)) => {
+                                    query_router.update_role(result);
+                                    continue;
+                                }
 
                                 _ => (),
                             };
@@ -1371,6 +1375,10 @@ where
                                 continue;
                             }
 
+                            Some(PluginOutput::OverwriteRouter(role)) => {
+                                query_router.update_role(role);
+                            }
+
                             _ => (),
                         };
 
diff --git a/src/config.rs b/src/config.rs
index a2314fc..78470ec 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -754,6 +754,7 @@ pub struct Plugins {
     pub table_access: Option<TableAccess>,
     pub query_logger: Option<QueryLogger>,
     pub prewarmer: Option<Prewarmer>,
+    pub route_query_to_primary : Option<RouteQueryToPrimary>,
 }
 
 impl std::fmt::Display for Plugins {
@@ -792,6 +793,13 @@ pub struct Prewarmer {
     pub queries: Vec<String>,
 }
 
+#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, Hash, Eq)]
+pub struct RouteQueryToPrimary {
+    pub enabled: bool,
+    pub force:bool,
+    pub qeurys: String,
+}
+
 impl Intercept {
     pub fn substitute(&mut self, db: &str, user: &str) {
         for (_, query) in self.queries.iter_mut() {
diff --git a/src/plugins/mod.rs b/src/plugins/mod.rs
index 5ef6009..f6d3125 100644
--- a/src/plugins/mod.rs
+++ b/src/plugins/mod.rs
@@ -12,8 +12,9 @@ pub mod intercept;
 pub mod prewarmer;
 pub mod query_logger;
 pub mod table_access;
+pub mod route_query_to_primary;
 
-use crate::{errors::Error, query_router::QueryRouter};
+use crate::{errors::Error, query_router::QueryRouter, config::Role};
 use async_trait::async_trait;
 use bytes::BytesMut;
 use sqlparser::ast::Statement;
@@ -21,6 +22,7 @@ use sqlparser::ast::Statement;
 pub use intercept::Intercept;
 pub use query_logger::QueryLogger;
 pub use table_access::TableAccess;
+pub use route_query_to_primary::RouteQueryToPrimary;
 
 #[derive(Clone, Debug, PartialEq)]
 pub enum PluginOutput {
@@ -28,6 +30,7 @@ pub enum PluginOutput {
     Deny(String),
     Overwrite(Vec<Statement>),
     Intercept(BytesMut),
+    OverwriteRouter(Role)
 }
 
 #[async_trait]
diff --git a/src/query_router.rs b/src/query_router.rs
index 126b813..a40ae22 100644
--- a/src/query_router.rs
+++ b/src/query_router.rs
@@ -136,6 +136,11 @@ impl QueryRouter {
         &self.pool_settings
     }
 
+    pub fn update_role<'a>(&'a mut self,role: Role) -> bool {
+        self.active_role=Some(role);
+        true
+    }
+
     /// Try to parse a command and execute it.
     pub fn try_execute_command(&mut self, message_buffer: &BytesMut) -> Option<(Command, String)> {
         let mut message_cursor = Cursor::new(message_buffer);

src/plugins/route_query_to_primary.rs

use async_trait::async_trait;
use regex::Regex;
use sqlparser::ast::{visit_expressions, Statement};

use crate::{
    errors::Error,
    plugins::{Plugin, PluginOutput},
    query_router::QueryRouter,
};

use log::info;

use core::ops::ControlFlow;

pub struct RouteQueryToPrimary<'a> {
    pub enabled: bool,
    pub force: bool,
    pub regex: &'a Regex,
}

#[async_trait]
impl<'a> Plugin for RouteQueryToPrimary<'a> {
    async fn run(
        &mut self,
        _query_router: &QueryRouter,
        ast: &Vec<Statement>,
    ) -> Result<PluginOutput, Error> {
        if !self.enabled {
            return Ok(PluginOutput::Allow);
        }

        let mut force_route = false;

        visit_expressions(ast, |express| {
            let expr = express.to_string();
            if self.regex.is_match(&expr) {
                force_route = true;
            }
            ControlFlow::<()>::Continue(())
        });

        if force_route && self.force {
            info!("force route query to primary: {}", force_route);
            Ok(PluginOutput::OverwriteRouter(crate::config::Role::Primary))
        } else {
            Ok(PluginOutput::Allow)
        }
    }
}

this code only a simply conceive, and not test running . I will try later.

mingjunyang avatar Jun 21 '23 05:06 mingjunyang