database icon indicating copy to clipboard operation
database copied to clipboard

💡 Add support for MySQL: ON DUPLICATE KEY UPDATE, PostgreSQL: ON CONFLICT

Open insane-dev opened this issue 3 years ago • 1 comments

I have an idea!

Hello, Cycle team!

I've been looking for an ability to write more complex insert queries. And so far I can't find a way to build a query like this one:

INSERT INTO some_table (`key`, `value`) VALUES (...) ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)

As I understand PostgreSQL also has this functionality: https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT

So I believe It would be really great to be able to do something like that:

$this->database()
  ->insert('some_table')
  ->columns('key', 'value')
  ->values([['key_1', 'value_1'], ['key_2', 'value_2']])
  ->onConflict(new Fragment('value = VALUES(value)')) // or ->onDuplicate(...)
  ->run();

Because now, I have to write it like:

$rows = [...];
$query = <<<SQL
INSERT INTO `some_table` (`key`, `value`) VALUES $rows ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)
SQL;

$this->database()->getDriver()->execute($query);

Any feedback on this would be appreciated.

insane-dev avatar Feb 14 '22 12:02 insane-dev

I created upsert functionality for a project more than a year ago. If I manage to find the time I'll try an put together a proper pull request.

In the mean time here's the basic idea.

Because the relevant code is tied to either interfaces or final classes, you'll need to override some files. E.g. Tell composer to search your paths containing overridden files.

{
    "autoload": {
        "psr-4": {
            "Cycle\\Database\\": "src/YourProject/Override/Cycle/Database/",
            "Cycle\\ORM\\": "src/YourProject/Override/Cycle/ORM/",
        }
    }
}

The files below must be copied from the related source files E.g. you cannot extend the base file, it needs to be copied. In situations like this I tend to comment /** BEGIN MODIFICATIONS */ and /** END MODIFICATIONS */ in order to keep track of what code is original and what has been modified.

src/YourProject/Override/Cycle/Database/Driver/MySQL/MySQLCompiler.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Driver\MySQL;

class MySQLCompiler extends Compiler implements CachingCompilerInterface
{
    /** BEGIN MODIFICATIONS */
    protected function fragment(
        QueryParameters $params,
        Quoter $q,
        FragmentInterface $fragment,
        bool $nestedQuery = true
    ): string {
        if ($fragment->getType() === self::UPSERT_QUERY) {
            return $this->upsertQuery($params, $q, $fragment->getTokens());
        }

        return parent::fragment($params, $q, $fragment, $nestedQuery);
    }

    protected function upsertQuery(QueryParameters $params, Quoter $q, array $tokens): string
    {
        $values = [];

        foreach ($tokens['values'] as $value) {
            $values[] = $this->value($params, $q, $value);
        }

        $alias = bin2hex(random_bytes(2));

        return sprintf(
            'INSERT INTO %s (%s) VALUES %s AS %s ON DUPLICATE KEY UPDATE %s',
            $this->name($params, $q, $tokens['table'], true),
            $this->columns($params, $q, $tokens['columns']),
            implode(', ', $values),
            $this->name($params, $q, $alias),
            $this->updates($params, $q, $tokens['columns'], $alias)
        );
    }

    protected function updates(
        QueryParameters $params,
        Quoter $q,
        array $columns,
        string $alias,
        int $maxLength = 180
    ): string {
        $columns = array_map(
            function ($column) use ($params, $q, $alias) {
                $name  = $this->name($params, $q, $column);
                $alias = $this->name($params, $q, $alias);
                return sprintf('%s = %s.%s', $name, $alias, $name);
            },
            $columns
        );

        return wordwrap(implode(', ', $columns), $maxLength);
    }
    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/Database/Driver/MySQL/MySQLDriver.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Driver\MySQL;

class MySQLDriver extends Driver
{
    /**
     * @param MySQLDriverConfig $config
     */
    public static function create(DriverConfig $config): static
    {
        return new static(
            $config,
            new MySQLHandler(),
            new MySQLCompiler('``'),
            new QueryBuilder(
                new MySQLSelectQuery(),
                new InsertQuery(),
                new MySQLUpdateQuery(),
                new UpsertQuery(), // BEGIN MODIFICATIONS
                new MySQLDeleteQuery()
            )
        );
    }
}
src/YourProject/Override/Cycle/Database/Driver/CompilerInterface.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Driver;

/**
 * @codeCoverageIgnore
 */
interface CompilerInterface
{
    /** BEGIN MODIFICATIONS */
    public const UPSERT_QUERY = 9;
    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/Database/Query/BuilderInterface.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Query;

interface BuilderInterface
{
    /** BEGIN MODIFICATIONS */
    public function upsertQuery(
        string $prefix,
        ?string $table = null
    ): UpsertQuery;

    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/Database/Query/QueryBuilder.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Query;

final class QueryBuilder implements BuilderInterface
{
    public function __construct(
        private SelectQuery $selectQuery,
        private InsertQuery $insertQuery,
        private UpdateQuery $updateQuery,
        private UpsertQuery $upsertQuery, // BEGIN MODIFICATIONS
        private DeleteQuery $deleteQuery
    ) {
    }

    /** BEGIN MODIFICATIONS */
    public function upsertQuery(
        string $prefix,
        ?string $table = null
    ): UpsertQuery {
        $upsert = $this->upsertQuery->withDriver($this->driver, $prefix);

        if ($table !== null) {
            $upsert->into($table);
        }

        return $upsert;
    }

    /** END MODIFICATIONS */

    public static function defaultBuilder(): self
    {
        return new self(
            new SelectQuery(),
            new InsertQuery(),
            new UpdateQuery(),
            new UpsertQuery(), // BEGIN MODIFICATIONS
            new DeleteQuery()
        );
    }
}
src/YourProject/Override/Cycle/Database/Query/UpsertQuery.php
<?php

declare(strict_types=1);

namespace Cycle\Database\Query;

use Cycle\Database\Driver\CompilerInterface;

/**
 * @codeCoverageIgnore
 */
class UpsertQuery extends InsertQuery
{
    public function getType(): int
    {
        return CompilerInterface::UPSERT_QUERY;
    }
}
src/YourProject/Override/Cycle/Database/Database.php
<?php

declare(strict_types=1);

namespace Cycle\Database;

final class Database implements DatabaseInterface
{
    /** BEGIN MODIFICATIONS */
    public function upsert(?string $table = null): UpsertQuery
    {
        return $this->getDriver(self::WRITE)
            ->getQueryBuilder()
            ->upsertQuery($this->prefix, $table);
    }

    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/Database/DatabaseInterface
<?php

declare(strict_types=1);

namespace Cycle\Database;

interface DatabaseInterface
{
    /** BEGIN MODIFICATIONS */
    public function upsert(?string $table = null): UpsertQuery;

    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/Database/Table.php
<?php

declare(strict_types=1);

namespace Cycle\Database;

final class Table implements TableInterface, \IteratorAggregate, \Countable
{
    /** BEGIN MODIFICATIONS */
    public function upsertOne(array $rowset = []): int|string|null
    {
        return $this->database
            ->upsert($this->name)
            ->values($rowset)
            ->run();
    }

    public function upsertMultiple(array $columns = [], array $rowsets = []): void
    {
        $this->database
            ->upsert($this->name)
            ->columns($columns)
            ->values($rowsets)
            ->run();
    }

    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/ORM/Mapper/DatabaseMapper.php
<?php

declare(strict_types=1);

namespace Cycle\ORM\Mapper;

abstract class DatabaseMapper implements MapperInterface
{
    /** BEGIN MODIFICATIONS */
    public function queueUpsert(object $entity, Node $node, State $state): CommandInterface
    {
        $values = $state->getData();

        $state->setStatus(Node::SCHEDULED_INSERT);

        foreach ($this->primaryKeys as $key) {
            if (! isset($values[$key])) {
                foreach ($this->nextPrimaryKey() ?? [] as $pk => $value) {
                    $state->register($pk, $value);
                }
                break;
            }
        }

        return new Upsert(
            $this->source->getDatabase(),
            $this->source->getTable(),
            $state,
            $this,
            $this->primaryKeys,
            count($this->primaryColumns) === 1 ? $this->primaryColumns[0] : null,
        );
    }

    /** END MODIFICATIONS */
}
src/YourProject/Override/Cycle/ORM/Command/Database/Upsert.php
<?php

declare(strict_types=1);

namespace Cycle\ORM\Command\Database;

use Cycle\Database\DatabaseInterface;
use Cycle\Database\Query\ReturningInterface;
use Cycle\ORM\Command\StoreCommand;
use Cycle\ORM\Command\Traits\ErrorTrait;
use Cycle\ORM\Command\Traits\MapperTrait;
use Cycle\ORM\Heap\State;
use Cycle\ORM\MapperInterface;

/**
 * Upsert data into associated table and provide lastInsertID promise.
 * @codeCoverageIgnore
 */
final class Upsert extends StoreCommand
{
    use ErrorTrait;
    use MapperTrait;

    public function __construct(
        DatabaseInterface $db,
        string $table,
        State $state,
        ?MapperInterface $mapper,
        /** @var string[] */
        private array $primaryKeys = [],
        private ?string $pkColumn = null
    ) {
        parent::__construct($db, $table, $state);
        $this->mapper = $mapper;
    }

    public function isReady(): bool
    {
        return true;
    }

    public function hasData(): bool
    {
        return $this->columns !== [] || $this->state->getData() !== [];
    }

    public function getStoreData(): array
    {
        if ($this->appendix !== []) {
            $this->state->setData($this->appendix);
            $this->appendix = [];
        }
        $data = $this->state->getData();
        return array_merge($this->columns, $this->mapper?->mapColumns($data) ?? $data);
    }

    /**
     * Insert data into associated table.
     */
    public function execute(): void
    {
        $state = $this->state;

        if ($this->appendix !== []) {
            $state->setData($this->appendix);
        }

        $uncasted = $data = $state->getData();

        // filter PK null values
        foreach ($this->primaryKeys as $key) {
            if (!isset($uncasted[$key])) {
                unset($uncasted[$key]);
            }
        }
        $uncasted = $this->prepareData($uncasted);

        $upsert = $this->db
            ->upsert($this->table) // BEGIN MODIFICATIONS
            ->values(\array_merge($this->columns, $uncasted));

        if ($this->pkColumn !== null && $upsert instanceof ReturningInterface) {
            $upsert->returning($this->pkColumn);
        }

        $insertID = $upsert->run();

        if ($insertID !== null && \count($this->primaryKeys) === 1) {
            $fpk = $this->primaryKeys[0]; // first PK
            if (!isset($data[$fpk])) {
                $state->register(
                    $fpk,
                    $this->mapper === null ? $insertID : $this->mapper->cast([$fpk => $insertID])[$fpk]
                );
            }
        }

        $state->updateTransactionData();

        parent::execute();
    }

    public function register(string $key, mixed $value): void
    {
        $this->state->register($key, $value);
    }
}

puzzledpolymath avatar Jun 12 '25 08:06 puzzledpolymath