Python static analysis (lint) plugin
User story
As a user, I want to have an integrated Python static analysis plugin, so that I can automatically check my Python code for errors, coding standards violations or other potential issues.
PyLint has set of rules to be found here that offer static analysis checks for python code.
They are sorted into severity groups but they are missing any categorization or labeling.
We could use manual sorting or script to get them into desired categories (code style | bug prevention)
TODO:
- [x] research how rules could be split into categories
- [x] make use of
--list-msgs-enabledoption to see which rules are included in static analysis
Implemented as custom plugin for customer, along JS Packages plugin equivalent for Poetry package manager.
code-pushup.pylint.plugin.ts
import type {
Audit,
AuditOutput,
Group,
Issue,
IssueSeverity,
PluginConfig,
} from "@code-pushup/models";
import {
capitalize,
compareIssueSeverity,
countOccurrences,
executeProcess,
objectToEntries,
pluralizeToken,
truncateIssueMessage,
} from "@code-pushup/utils";
export default async function pylintPlugin(
pattern: string
): Promise<PluginConfig> {
const enabledMessages = await findEnabledMessages(pattern);
const audits = listAudits(enabledMessages);
const groups = listGroups(enabledMessages);
return {
slug: "pylint",
title: "PyLint",
icon: "python",
audits,
groups,
runner: () => runLint(pattern, audits),
};
}
type PylintJson2 = {
messages: PylintMessage[];
statistics: PylintStatistics;
};
type PylintMessageType =
| "fatal"
| "error"
| "warning"
| "refactor"
| "convention"
| "info";
type PylintMessage = {
type: PylintMessageType;
symbol: string;
message: string;
messageId: string;
confidence: string;
module: string;
obj: string;
line: number;
column: number;
endLine: number | null;
endColumn: number | null;
path: string;
absolutePath: string;
};
type PylintStatistics = {
messageTypeCount: Record<PylintMessageType, number>;
modulesLinted: number;
score: number;
};
type EnabledMessage = {
symbol: string;
messageId: string;
};
async function findEnabledMessages(
pattern: string
): Promise<EnabledMessage[]> {
const { stdout } = await executeProcess({
command: "python",
args: ["-m", "pylint", "--list-msgs-enabled", pattern],
});
const lines = stdout.split("\n");
const enabledStart = lines.indexOf("Enabled messages:");
const enabledEnd = lines.findIndex(
(line, i) => i > enabledStart && !line.startsWith(" ")
);
const enabledLines = lines.slice(enabledStart, enabledEnd);
return enabledLines
.map((line): EnabledMessage | null => {
const match = line.match(/^ ([\w-]+) \(([A-Z]\d+)\)$/);
if (!match) {
return null;
}
const [, symbol, messageId] = match;
return { symbol, messageId };
})
.filter((msg): msg is EnabledMessage => msg != null);
}
function listAudits(enabledMessages: EnabledMessage[]): Audit[] {
return enabledMessages.map(({ symbol, messageId }): Audit => {
const type = messageIdToType(messageId);
return {
slug: symbol,
title: `${symbol} (${messageId})`,
...(type && {
docsUrl: `https://pylint.readthedocs.io/en/stable/user_guide/messages/${type}/${symbol}.html`,
}),
};
});
}
function listGroups(enabledMessages: EnabledMessage[]): Group[] {
// source: https://github.com/pylint-dev/pylint/blob/main/pylint/config/help_formatter.py#L47-L53
const descriptions: Record<PylintMessageType, string> = {
info: "for informational messages",
convention: "for programming standard violation",
refactor: "for bad code smell",
warning: "for python specific problems",
error: "for probable bugs in the code",
fatal: "if an error occurred which prevented pylint from doing further processing",
};
const categoriesMap = enabledMessages.reduce<Record<string, string[]>>(
(acc, { symbol, messageId }) => {
const type = messageIdToType(messageId);
if (!type) {
return acc;
}
return { ...acc, [type]: [...(acc[type] ?? []), symbol] };
},
{}
);
return Object.entries(categoriesMap).map(
([type, symbols]): Group => ({
slug: type,
title: capitalize(type),
description: descriptions[type],
docsUrl: `https://pylint.readthedocs.io/en/stable/user_guide/messages/messages_overview.html#${type}`,
refs: symbols.map((symbol) => ({ slug: symbol, weight: 1 })),
})
);
}
function messageIdToType(messageId: string): PylintMessageType | null {
switch (messageId[0]) {
case "F":
return "fatal";
case "E":
return "error";
case "W":
return "warning";
case "R":
return "refactor";
case "C":
return "convention";
case "I":
return "info";
default:
return null;
}
}
async function runLint(
pattern: string,
audits: Audit[]
): Promise<AuditOutput[]> {
const { stdout, stderr } = await executeProcess({
command: "python",
args: ["-m", "pylint", "--output-format=json2", pattern],
ignoreExitCode: true,
});
if (stderr) {
throw new Error(stderr);
}
const result = JSON.parse(stdout) as PylintJson2;
const issuesMap = result.messages.reduce<Record<string, Issue[]>>(
(acc, message) => ({
...acc,
[message.symbol]: [
...(acc[message.symbol] ?? []),
messageToIssue(message),
],
}),
{}
);
return audits.map(({ slug }): AuditOutput => {
const issues = issuesMap[slug] ?? [];
const severityCounts = countOccurrences(
issues.map(({ severity }) => severity)
);
const severities = objectToEntries(severityCounts);
const summaryText =
[...severities]
.sort((a, b) => -compareIssueSeverity(a[0], b[0]))
.map(([severity, count = 0]) => pluralizeToken(severity, count))
.join(", ") || "passed";
return {
slug,
score: Number(issues.length === 0),
value: issues.length,
displayValue: summaryText,
details: { issues },
};
});
}
function messageToIssue({
type,
message,
path,
line,
column,
endLine,
endColumn,
}: PylintMessage): Issue {
return {
message: truncateIssueMessage(message.replace(/_/g, "\\_")),
severity: messageTypeToSeverity(type),
source: {
file: path,
position: {
startLine: line,
startColumn: column + 1,
...(endLine != null && { endLine }),
...(endColumn != null && { endColumn: endColumn + 1 }),
},
},
};
}
function messageTypeToSeverity(type: PylintMessageType): IssueSeverity {
switch (type) {
case "fatal":
case "error":
return "error";
case "warning":
return "warning";
case "refactor":
case "convention":
case "info":
return "info";
}
}
code-pushup.poetry.plugin.ts
import type { AuditOutput, Issue, PluginConfig } from '@code-pushup/models';
import { executeProcess, pluralizeToken } from '@code-pushup/utils';
import { diff } from 'semver';
export default function poetryPlugin(): PluginConfig {
return {
slug: 'poetry',
title: 'Poetry',
icon: 'poetry',
audits: [
{
slug: 'vulnerabilities',
title: 'Vulnerabilities',
description: 'Checks for security vulnerabilities in dependencies.',
},
{
slug: 'outdated',
title: 'Outdated versions',
description: 'Checks for outdated packages.',
},
],
runner: () => Promise.all([runAudit(), runOutdated()]),
};
}
async function runAudit(): Promise<AuditOutput> {
const { vulnerabilities } = await runPoetryAudit();
const packages = await runPoetryShow();
return {
slug: 'vulnerabilities',
score: Math.max(
0,
(packages.length - vulnerabilities.length) / packages.length
),
value: vulnerabilities.length,
displayValue: pluralizeToken('vulnerability', vulnerabilities.length),
details: {
issues: vulnerabilities.flatMap(({ name, installedVersion, vulns }) =>
vulns.map(
({ advisory, affectedVersion, cve }): Issue => ({
message: `\`${name}\` has _${cve}_ vulnerability which affects versions **${affectedVersion}** (installed version is **${installedVersion}**). ${advisory}`,
severity: 'error',
})
)
),
},
};
}
async function runOutdated(): Promise<AuditOutput> {
const allPackages = await runPoetryShow();
const outdatedPackages = await runPoetryShow(true);
const issues = outdatedPackages
.map(({ name, version, latest }): Issue | null => {
try {
const level = diff(version, latest);
if (level == null) {
return null;
}
return {
message: `\`${name}\` requires a **${level}** update from **${version}** to **${latest}**`,
severity:
level === 'major'
? 'error'
: level === 'minor'
? 'warning'
: 'info',
};
} catch (err: unknown) {
// ignore errors like: `Invalid Version: 6.4.post2`
if (
err instanceof TypeError &&
err.message.startsWith('Invalid Version')
) {
return null;
}
throw err;
}
})
.filter((issue): issue is Issue => issue != null);
const majorOutdatedCount = issues.filter(
issue => issue.severity === 'error'
).length;
return {
slug: 'outdated',
score: (allPackages.length - majorOutdatedCount) / allPackages.length,
value: outdatedPackages.length,
displayValue: pluralizeToken('outdated package', outdatedPackages.length),
details: {
issues,
},
};
}
type PoetryAudit = {
vulnerabilities: PoetryAuditVulnerability[];
metadata: {
auditVersion: string;
'poetry.lock': {
updated: boolean;
fresh: true;
};
};
};
type PoetryAuditVulnerability = {
name: string;
installedVersion: string;
vulns: {
cve: string;
affectedVersion: string;
advisory: string;
}[];
};
type PoetryShowPackage<T extends boolean> = {
name: string;
version: string;
description: string;
} & (T extends true ? { latest: string } : {});
async function runPoetryAudit(): Promise<PoetryAudit> {
const { code, stdout, stderr } = await executeProcess({
command: 'poetry',
args: ['audit', '--json'],
ignoreExitCode: true,
});
if (code !== 1 || stderr) {
throw new Error(stderr || stdout);
}
return JSON.parse(stdout) as PoetryAudit;
}
async function runPoetryShow<T extends boolean>(
outdated?: T
): Promise<PoetryShowPackage<T>[]> {
const { stdout } = await executeProcess({
command: 'poetry',
args: ['show', ...(outdated ? ['--outdated'] : [])],
ignoreExitCode: true,
});
const regex = outdated
? /^([\w-]+)\s+(\d+\.\d+\.\w*\d+)\s+(\d+\.\d+\.\w*\d+)\s+(.*)$/
: /^([\w-]+)\s+(\d+\.\d+\.\w*\d+)\s+(.*)$/;
const lines = stdout.split('\n');
return lines
.map((line): PoetryShowPackage<T> | null => {
const match = line.match(regex);
if (!match) {
return null;
}
if (outdated) {
const [, name, version, latest, description] = match;
return { name, version, latest, description };
}
const [, name, version, description] = match;
return { name, version, description } as PoetryShowPackage<T>;
})
.filter((pkg): pkg is PoetryShowPackage<T> => pkg != null);
}
code-pushup.config.ts
import coveragePlugin from '@code-pushup/coverage-plugin';
import type { CoreConfig } from '@code-pushup/models';
import poetryPlugin from './code-pushup.poetry.plugin';
import pylintPlugin from './code-pushup.pylint.plugin';
import 'dotenv/config';
const config: CoreConfig = {
plugins: [
await coveragePlugin({
reports: ['../reports/lcov.info'],
coverageTypes: ['line'],
}),
await pylintPlugin('../kw'),
poetryPlugin(),
],
categories: [
{
slug: 'bug-prevention',
title: 'Bug prevention',
description: "PyLint rules that find **potential bugs** in your code.",
refs: [
{ type: 'group', plugin: 'pylint', slug: 'error', weight: 5 },
{ type: 'group', plugin: 'pylint', slug: 'warning', weight: 1 },
],
},
{
slug: 'code-style',
title: 'Code style',
description: "PyLint rules that promote **good practices** and consistency in your code.",
refs: [
{ type: 'group', plugin: 'pylint', slug: 'refactor', weight: 1 },
{ type: 'group', plugin: 'pylint', slug: 'convention', weight: 1 },
{ type: 'group', plugin: 'pylint', slug: 'info', weight: 0 },
],
},
{
slug: 'coverage',
title: 'Code coverage',
description: 'How many lines of code are **covered by tests** with pytest.',
refs: [
{ type: 'audit', plugin: 'coverage', slug: 'line-coverage', weight: 1 },
],
},
{
slug: 'security',
title: 'Security',
description: "Finds known **vulnerabilities** in Poetry packages.",
refs: [
{ type: 'audit', plugin: 'poetry', slug: 'vulnerabilities', weight: 1 },
],
},
{
slug: 'updates',
title: 'Updates',
description: "Finds **outdated** Poetry packages.",
refs: [{ type: 'audit', plugin: 'poetry', slug: 'outdated', weight: 1 }],
},
],
...(process.env.CP_API_KEY && {
upload: {
server: 'https://api.codepushup.skypicker.com/graphql',
apiKey: process.env.CP_API_KEY,
organization: 'kiwi',
project: 'autobooking-backbone',
},
}),
};
export default config;
@vmasek I'd suggest closing this research issue in favour of creating new issues to build the official Python plugins based on this prototype.