adversarial-robustness-toolbox icon indicating copy to clipboard operation
adversarial-robustness-toolbox copied to clipboard

Create Backdoor-based Bayesian Diffusion Model

Open OrsonTyphanel93 opened this issue 1 year ago • 4 comments

BacKBayDiffMod : Backdoor-based Bayesian Diffusion Model

Hey Guys @beat-buesser ! , @f4str , @GiulioZizzo I've just developed the first ever backdoor attack using a Bayesian approach, a diffusion model and a Fokker Planck equation reference 1 , reference 2, reference , arxiv absolute convergence to avoid a non-decreasing process,

In this update version of BacKBayDiffMod we integrate a simulation with and without group of "gauge_group = 'SU(3)".

Testing : BacKBayDiffMod updated to incorporate Yang Mills theory link :

The complete notebook update: BacKBayDiffMod we integrate a simulation with and without group of "gauge_group" = 'SU'

To understand all the suptilites in depth, you can consult these sources :

(Reference 0) (Reference 1) (Reference 2) (Reference 3) (Reference 4) (Reference 5) (Reference 6 (best!)) (Reference 7) (Reference 8) (Reference 9(best)) (Reference 10(best!!!! youtube : Stochastic Quantisation of Yang-Mills)) (Reference 11) (Reference 12) (Reference 13) (Reference 14)


The complete notebook , notebook, Backdoor-based Bayesian Diffusion Model, HugginFace ASR demonstrating the feasibility of this attack is available here, as far as I know, I've managed to "Backdoored" all HugginFace's pre-trained ASR models without exception! the attack is undetectable!!!!!

The attack can also be extended, of course, to other DNN architectures - it will still work!

##See here and here for a more subtle understanding of mathematical concepts

Fokker Planck Equation ; Diffusion

Type of change

This backdoor attack is a technique which implements a poisoning attack with a clean label backdoor. It contains methods such as Poisoning Attack, (which takes as input the audio data and corresponding labels and returns the poisoned audio data and labels) to apply the attack to the audio data, Bayesian style is implemented using a prior and pymc framework with the Fokker-Planck equation for sampling to obtain and define the prior distribution, and a diffusion technique is then applied to name: texttt{back\_diffusion\_sampling} (which implements a diffusion-based sampling technique to generate a sequence of samples as a function of certain parameters and a noise distribution.

Ever since the introduction of LLMs as large language models, industry and academia have strived to deploy artificially intelligent models at scale based on LLMs in a bid to save time and achieve results on time. LLMs are used by most large-scale machine learning pipelines, which in return helps save time and get faster results as they most likely come from very foundational models derived from DNNs deep neural networks. In fact, some LLMs, such as those that rely on DNN models to produce sound, often use diffusion approaches. Diffusion models are state-of-the-art deep learning generative models that are trained on the principle of learning forward and backward diffusion processes via the progressive addition of noise and denoising

In backdoor attack, we seek to fool audio-based DNN models, such as those in the HugginFace framework, . The backdoor attack developed is based on poisoning the model's training data by incorporating backdoor diffusion sampling and a Bayesian approach to the distribution of the poisoned data. This approach allows poisoned data to substitute for clean data while remaining poisoned.


BacKBayDiffMod we integrate a simulation with group of "gauge_group" = 'SU3'


class YangMillsSimulator:
    def __init__(self, alpha: np.ndarray, beta: np.ndarray, sigma: np.ndarray, noise_dist: Callable[[Any], np.ndarray], particle_creation_probability: float = 0.1):
        self.alpha = alpha
        self.beta = beta
        self.sigma = sigma
        self.noise_dist = noise_dist
        self.particle_creation_probability = particle_creation_probability
        self.gauge_group = 'SU(3)'  # Example of a gauge group

    def calculate_mass_gap(self, t: int) -> float:
        return np.sqrt(self.alpha[t])

    def simulate_particle_creation(self, x: float, t: int, temperature: float) -> float:
        mass_gap = self.calculate_mass_gap(t)
        adjusted_probability = self.particle_creation_probability * np.exp(-mass_gap / temperature)
        if not np.random.rand() < adjusted_probability:
            return 0.0
        return mass_gap * self.noise_dist(self.beta[t])

    def simulate_lattice(self, lattice_size: int, temperature: float) -> np.ndarray:
        lattice = np.zeros((lattice_size, lattice_size))
        for i in range(lattice_size):
            for j in range(lattice_size):
                particle = self.simulate_particle_creation(i, j, temperature)
                lattice[i, j] = particle
        return lattice

    def generate_su3_matrix(self) -> np.ndarray:
        """Generate a random SU(3) matrix."""
        # Generate a 3x3 matrix with random entries
        matrix = np.random.rand(3, 3)
        # Normalize the matrix so it has unit determinant
        matrix /= np.linalg.det(matrix)**(1/3)
        return matrix

    def apply_gauge_transformation(self, particle: float, transformation_matrix: np.ndarray) -> float:
        Apply a gauge transformation to a particle. Now uses a randomly generated SU(3) matrix.
        # Ensure the transformation matrix is normalized
        transformation_matrix /= np.linalg.det(transformation_matrix)**(1/3)
        return particle * transformation_matrix[int(particle)]

    def simulate_interactions(self, lattice: np.ndarray, temperature: float) -> np.ndarray:
        # Generate a new SU(3) matrix for each particle
        transformation_matrices = np.array([self.generate_su3_matrix() for _ in range(len(lattice.flat))])
        transformed_lattice = np.array([self.apply_gauge_transformation(particle, transformation_matrices[i]) for i, particle in enumerate(lattice.flat)])
        return transformed_lattice.reshape(lattice.shape)

    def run_simulation(self, lattice_size: int, temperature: float, steps: int):
        lattice = self.simulate_lattice(lattice_size, temperature)
        for step in range(steps):
            lattice = self.simulate_interactions(lattice, temperature)
            # Optionally, update parameters like alpha, beta, sigma based on the current state
        return lattice

    def parallel_simulation(self, lattice_size: int, temperature: float, steps: int, num_processes: int):
        with ProcessPoolExecutor(num_processes) as executor:
            results = list(, [(lattice_size, temperature, steps) for _ in range(num_processes)]))
        return np.mean(results, axis=0)

OrsonTyphanel93 avatar Feb 03 '24 23:02 OrsonTyphanel93

Thanks !!!!

OrsonTyphanel93 avatar Feb 04 '24 00:02 OrsonTyphanel93

Hi guys @beat-buesser! , @f4str , thanks, great job!

Perso: For the attacks I will stop! ( I think I have already done the trick ), I will already contribute more to improve some existing defenses on the ART :) according to my availability ;

Thanks a lot!

OrsonTyphanel93 avatar Feb 18 '24 17:02 OrsonTyphanel93


please, always quote ART if you use this code

For very complex parallel calculations, please use this second option


import numpy as np
from typing import Callable, Any
from joblib import Parallel, delayed
from sklearn.ensemble import RandomForestRegressor

class YangMillsSimulator:
    def __init__(self, alpha: np.ndarray, beta: np.ndarray, sigma: np.ndarray, noise_dist: Callable[[Any], np.ndarray], particle_creation_probability: float = 0.1):
        self.alpha = alpha
        self.beta = beta
        self.sigma = sigma
        self.noise_dist = noise_dist
        self.particle_creation_probability = particle_creation_probability
        self.model = RandomForestRegressor(n_estimators=100, random_state=0)

    def calculate_mass_gap(self, t: int) -> float:
        return np.sqrt(self.alpha[t])

    def simulate_particle_creation(self, x: float, t: int) -> float:
        if not np.random.rand() < self.particle_creation_probability:
            return 0.0
        mass_gap = self.calculate_mass_gap(t)
        # Incorporate quantum effects
        quantum_effect = np.exp(-self.beta[t] / (2 * mass_gap))
        return mass_gap * self.noise_dist(self.beta[t]) * quantum_effect

    def simulate_lattice(self, lattice_size: int, temperature: float) -> np.ndarray:
        # Initialize an empty lattice
        lattice = np.zeros((lattice_size, lattice_size))

        # Use joblib for parallel execution, ensuring all tasks complete
        results = Parallel(n_jobs=-1, backend='loky')(
            delayed(self.simulate_particle_creation)(i, j) for i in range(lattice_size) for j in range(lattice_size)

        # Reshape the results into the lattice shape
        lattice = np.array(results).reshape(lattice_size, lattice_size)

        return lattice

    def simulate_quark_confinement(self, confinement_scale: float) -> np.ndarray:
        confined_mass_gap = self.alpha / confinement_scale
        # Simulate the effects of quark confinement in a more dynamic manner
        confined_mass_gap = np.where(confined_mass_gap > 0, confined_mass_gap, 0)
        return np.array(confined_mass_gap)

    def train_model(self, training_data: np.ndarray, training_labels: np.ndarray):, training_labels)

    def predict_outcomes(self, test_data: np.ndarray) -> np.ndarray:
        return self.model.predict(test_data) ```

OrsonTyphanel93 avatar Apr 14 '24 05:04 OrsonTyphanel93

please, always quote ART (Adversarial-Robustness-Toolbox) if you use this code,

realistic method, know how you'll get your particles in the real world.

particles = get_particles_array() # This is a placeholder for your real-life method of obtaining particles


class YangMillsSimulator:
    def __init__(self, alpha: np.ndarray, beta: np.ndarray, sigma: np.ndarray, noise_dist: Callable[[Any], np.ndarray], particle_creation_probability: float = 0.1):
        self.alpha = alpha
        self.beta = beta
        self.sigma = sigma
        self.noise_dist = noise_dist
        self.particle_creation_probability = particle_creation_probability

    def calculate_mass_gap(self, t: int) -> float:
        return np.sqrt(self.alpha[t])

    def simulate_particle_creation(self, x: float, t: int, temperature: float, particles: np.ndarray) -> float:
        mass_gap = self.calculate_mass_gap(t)
        adjusted_probability = self.particle_creation_probability * np.exp(-mass_gap / temperature)
        if not np.random.rand() < adjusted_probability:
            return 0.0

        G = 6.67430e-11 # Gravitational constant
        softening = 1e-9 # Softening parameter to avoid numerical issues
        forces = np.zeros_like(particles)
        for i in range(len(particles)):
            for j in range(i+1, len(particles)):
                r = particles[j] - particles[i]
                r_norm = np.linalg.norm(r)
                if r_norm > 0:
                    force = G * particles[i] * particles[j] / r_norm**2
                    forces[i] += force
                    forces[j] -= force

        particles += forces * self.noise_dist(self.beta[t])

        return mass_gap * self.noise_dist(self.beta[t])

    def simulate_lattice(self, lattice_size: int, temperature: float) -> np.ndarray:
        lattice = np.zeros((lattice_size, lattice_size))
        for i in range(lattice_size):
            for j in range(lattice_size):
                particle = self.simulate_particle_creation(i, j, temperature, lattice)
                lattice[i, j] = particle
        return lattice

    def simulate_interactions(self, lattice: np.ndarray, temperature: float) -> np.ndarray:
        # Simulate interactions based on the current state of the lattice
        # This is a placeholder for more complex interaction calculations
        return lattice

    def run_simulation(self, lattice_size: int, temperature: float, steps: int):
        lattice = self.simulate_lattice(lattice_size, temperature)
        for step in range(steps):
            lattice = self.simulate_interactions(lattice, temperature)
        return lattice

    def parallel_simulation(self, lattice_size: int, temperature: float, steps: int, num_processes: int):
        with ProcessPoolExecutor(num_processes) as executor:
            results = list(, [(lattice_size, temperature, steps) for _ in range(num_processes)]))
        return np.mean(results, axis=0)

OrsonTyphanel93 avatar Apr 15 '24 07:04 OrsonTyphanel93