spark icon indicating copy to clipboard operation
spark copied to clipboard

[WIP][SPARK-48752][PYTHON][CONNECT] Introduce `pyspark.logging` for improved structured logging for PySpark

Open itholic opened this issue 1 year ago • 0 comments

What changes were proposed in this pull request?

This PR introduces the pyspark.logging module to facilitate structured client-side logging for PySpark users.

This module includes a PySparkLogger class that provides several methods for logging messages at different levels in a structured JSON format:

  • PySparkLogger.log_info
  • PySparkLogger.log_warn
  • PySparkLogger.log_error

The logger can be easily configured to write logs to either the console or a specified file.

DataFrame error log improvement

This PR also improves the DataFrame API error logs by leveraging this new logging framework:

Before

We introduced structured logging from https://github.com/apache/spark/pull/45729, but PySpark log is still hard to figure out in the current structured log, because it is hidden and mixed within bunch of complex JVM stacktraces and it's also not very Python-friendly:

{
  "ts": "2024-06-28T10:53:48.528Z",
  "level": "ERROR",
  "msg": "Exception in task 7.0 in stage 0.0 (TID 7)",
  "context": {
    "task_name": "task 7.0 in stage 0.0 (TID 7)"
  },
  "exception": {
    "class": "org.apache.spark.SparkArithmeticException",
    "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n",
    "stacktrace": [
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors$",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": 203
      },
      {
        "class": "org.apache.spark.sql.errors.QueryExecutionErrors",
        "method": "divideByZeroError",
        "file": "QueryExecutionErrors.scala",
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "project_doConsume_0$",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1",
        "method": "processNext",
        "file": null,
        "line": -1
      },
      {
        "class": "org.apache.spark.sql.execution.BufferedRowIterator",
        "method": "hasNext",
        "file": "BufferedRowIterator.java",
        "line": 43
      },
      {
        "class": "org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1",
        "method": "hasNext",
        "file": "WholeStageCodegenEvaluatorFactory.scala",
        "line": 50
      },
      {
        "class": "org.apache.spark.sql.execution.SparkPlan",
        "method": "$anonfun$getByteArrayRdd$1",
        "file": "SparkPlan.scala",
        "line": 388
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "$anonfun$mapPartitionsInternal$2$adapted",
        "file": "RDD.scala",
        "line": 896
      },
      {
        "class": "org.apache.spark.rdd.MapPartitionsRDD",
        "method": "compute",
        "file": "MapPartitionsRDD.scala",
        "line": 52
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "computeOrReadCheckpoint",
        "file": "RDD.scala",
        "line": 369
      },
      {
        "class": "org.apache.spark.rdd.RDD",
        "method": "iterator",
        "file": "RDD.scala",
        "line": 333
      },
      {
        "class": "org.apache.spark.scheduler.ResultTask",
        "method": "runTask",
        "file": "ResultTask.scala",
        "line": 93
      },
      {
        "class": "org.apache.spark.TaskContext",
        "method": "runTaskWithListeners",
        "file": "TaskContext.scala",
        "line": 171
      },
      {
        "class": "org.apache.spark.scheduler.Task",
        "method": "run",
        "file": "Task.scala",
        "line": 146
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "$anonfun$run$5",
        "file": "Executor.scala",
        "line": 644
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally",
        "file": "SparkErrorUtils.scala",
        "line": 64
      },
      {
        "class": "org.apache.spark.util.SparkErrorUtils",
        "method": "tryWithSafeFinally$",
        "file": "SparkErrorUtils.scala",
        "line": 61
      },
      {
        "class": "org.apache.spark.util.Utils$",
        "method": "tryWithSafeFinally",
        "file": "Utils.scala",
        "line": 99
      },
      {
        "class": "org.apache.spark.executor.Executor$TaskRunner",
        "method": "run",
        "file": "Executor.scala",
        "line": 647
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor",
        "method": "runWorker",
        "file": "ThreadPoolExecutor.java",
        "line": 1136
      },
      {
        "class": "java.util.concurrent.ThreadPoolExecutor$Worker",
        "method": "run",
        "file": "ThreadPoolExecutor.java",
        "line": 635
      },
      {
        "class": "java.lang.Thread",
        "method": "run",
        "file": "Thread.java",
        "line": 840
      }
    ]
  },
  "logger": "Executor"
}

After

Now we can get a improved, simplified and also Python-friendly error log for DataFrame errors:

{
  "timestamp": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "name": "DataFrameQueryContextLogger",
  "message": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n", 
  "context": {
    "file": "/.../spark/python/test_error_context.py",
    "line_no": "17",
    "fragment": "__truediv__"
  },
  "error_class": "DIVIDE_BY_ZERO"
}

Why are the changes needed?

Before

Currently we don't have PySpark dedicated logging module so we have to manually set up and customize the Python logging module, for example:

logger = logging.getLogger("TestLogger")
user = "test_user"
action = "test_action"
logger.info(f"User {user} takes an {action}")

This logs an information just in a following simple string:

INFO:TestLogger:User test_user takes an test_action

This is not very actionable, and it is hard to analyze not since it is not well-structured.

Or we can use Log4j from JVM which resulting in excessively detailed logs as described in the above example, and this way even cannot be applied to Spark Connect.

After

We can simply import and use PySparkLogger with minimal setup:

from pyspark.logging import PySparkLogger
logger = PySparkLogger.get_logger("TestLogger")
user = "test_user"
action = "test_action"
logger.log_info(f"User {user} takes an {action}", user=user, action=action)

This logs an information in a following JSON format:

{
  "timestamp": "2024-06-28 19:44:19,030",
  "level": "INFO",
  "name": "TestLogger",
  "message": "User test_user takes an test_action",
  "user": "test_user",
  "action": "test_action"
}

NOTE: we can add as many keyword arguments as we want for each logging methods, for example user and action in the example, and the key/value of this keyword argument is used as the same key/value in the JSON format log, making it very easy to track the log.

Does this PR introduce any user-facing change?

No API changes, but the PySpark client-side logging is improved.

How was this patch tested?

Added UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

itholic avatar Jun 28 '24 11:06 itholic