modular icon indicating copy to clipboard operation
modular copied to clipboard

[BUG]: algorithm.parallelize crashes program

Open elvispresniy opened this issue 2 years ago • 4 comments

Bug description

0.00033999278648190045 12.336449968250383 GFLOP/s [114667:114667:20231209,143503.671761:ERROR file_io_posix.cc:144] open /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq: No such file or directory (2) [114667:114667:20231209,143503.671869:ERROR file_io_posix.cc:144] open /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq: No such file or directory (2) Stack dump: 0. Program arguments: mojo main.mojo

The program started crashing when I added for loop into the main

Steps to reproduce

from algorithm import parallelize, vectorize
from tensor import Tensor
from utils.index import Index
from sys.info import simdwidthof
from random import rand
import benchmark

alias type = DType.float32
alias nelts = simdwidthof[type]()

alias M = 128
alias N = 128
alias K = 128

@always_inline
fn bench[
    func: fn (inout Tensor[type], Tensor[type], Tensor[type]) -> None]():
    var A = rand[type](M, K)
    var B = rand[type](K, N)
    var C = Tensor[type](M, N)

    @always_inline
    @parameter
    fn test_fn():
        _ = func(C, A, B)

    let secs = benchmark.run[test_fn](max_runtime_secs=1).mean()
    let gflops = ((2 * M * N * K) / secs) / 1e9
    print(secs)
    print(gflops, "GFLOP/s")

fn matmul_parallelized(inout C: Tensor[type], A: Tensor[type], B: Tensor[type]):
    @parameter
    fn calc_row(m: Int):
        for k in range(K):
            @parameter
            fn func[nelts: Int](n: Int):
                C.simd_store[nelts](
                    m * M + n,
                    C.simd_load[nelts](m * M + n) + A.simd_load[nelts](m * M + k) * B.simd_load[nelts](k * K + n)
                )
            vectorize[nelts, func](N)
    parallelize[calc_row](M, M)

fn main():
    for i in range(5):
        bench[matmul_parallelized]()

System information

- What OS did you do install Mojo on ?
* Windows 11
- Provide version information for Mojo by pasting the output of `mojo -v`
* mojo 0.6.0 (d55c0025)
- Provide Modular CLI version by pasting the output of `modular -v`
* modular 0.2.2 (95d42445)

elvispresniy avatar Dec 09 '23 09:12 elvispresniy

I'm not able to reproduce the crash. Does it always crash for you or just sometimes? Do you get the correct output from running the program aside from the "/sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq" errors?

ematejska avatar Dec 12 '23 17:12 ematejska

@ematejska I reproduced the crash on Docker, Intel Mac. I'm not sure about if the results are correct, however, some results did get produced before the crash. The trace reads:

0.0014979107597069598
2.8001027249584229 GFLOP/s
0.0018947687330543932
2.2136231862126645 GFLOP/s
[45173:45173:20231213,024209.681814:ERROR file_io_posix.cc:144] open /sys/devices/system/cpu/cpu0/cpufreq/scaling_cur_freq: No such file or directory (2)
[45173:45173:20231213,024209.681932:ERROR file_io_posix.cc:144] open /sys/devices/system/cpu/cpu0/cpufreq/scaling_max_freq: No such file or directory (2)
[45173:45174:20231213,024209.684584:ERROR directory_reader_posix.cc:42] opendir /home/vscode/.modular/crashdb/attachments/4220a3bb-b1e2-4b6a-b6aa-e715745514ce: No such file or directory (2)
Please submit a bug report to https://github.com/modularml/mojo/issues and include the crash backtrace along with all the relevant source codes.
Stack dump:
0.      Program arguments: /home/vscode/.modular/pkg/packages.modular.com_mojo/bin/mojo /workspaces/ubuntu/issues/1450.mojo
^C

soraros avatar Dec 13 '23 02:12 soraros

It's related to early destruction of C (inserting _ = C fixes the problem). Only crashes with certain tensor shape.

from algorithm import parallelize
from tensor import Tensor
import benchmark

fn main():
  var t = Tensor[DType.float32](100, 100)

  @parameter
  fn test_fn():
    @parameter
    fn calc_row(m: Int):
      t.simd_store[1](m, 0.0)
    parallelize[calc_row](1, 10)

  let secs = benchmark.run[test_fn](max_runtime_secs=0.1).mean()
  print(secs)

soraros avatar Jan 28 '24 02:01 soraros

Less minimal example:

from runtime.asyncrt import DeviceContextPtr, TaskGroup, parallelism_level
from os.atomic import Atomic
from time import sleep

fn run[
    thread_a_part1: fn () capturing -> Int,
    thread_a_part2: fn () capturing -> Int,
    thread_b_part1: fn () capturing -> Int,
    thread_b_part2: fn () capturing -> Int,
]():
   """Alternate work on two threads.

    While Thread a is doing part1, Thread b is doing part2,
    and then flip.
    """
    var a_1_done = Atomic[DType.uint8](0)
    var a_2_done = Atomic[DType.uint8](0)
    var b_1_done = Atomic[DType.uint8](1)
    var b_2_done = Atomic[DType.uint8](1)
    var shutdown = Atomic[DType.uint8](0)
    alias SLEEP_TIME = 0.1

    @parameter
    @always_inline
    async fn thread_a():
        while True:
            if b_2_done.load() == 1:
                _ = b_2_done.fetch_sub(1)
                if thread_a_part2() < 0:
                    _ = shutdown.fetch_add(1)
                _ = a_2_done.fetch_add(1)

            if shutdown.load() > 0:
                print("shutting down a")
                break

            if b_1_done.load() == 1:
                _ = b_1_done.fetch_sub(1)
                if thread_a_part1() < 0:
                    _ = shutdown.fetch_add(1)
                _ = a_1_done.fetch_add(1)

            sleep(SLEEP_TIME)

    @parameter
    @always_inline
    async fn thread_b():
        while True:
            if a_2_done.load() == 1:
                _ = a_2_done.fetch_sub(1)
                if thread_b_part2() < 0:
                    _ = shutdown.fetch_add(1)
                _ = b_2_done.fetch_add(1)

            if shutdown.load() > 0:
                print("shutting down b")
                break

            if a_1_done.load() == 1:
                _ = a_1_done.fetch_sub(1)
                if thread_b_part1() < 0:
                    _ = shutdown.fetch_add(1)
                _ = b_1_done.fetch_add(1)

            sleep(SLEEP_TIME)

    var tg = TaskGroup()
    tg.create_task(thread_a())
    tg.create_task(thread_b())
    tg.wait()


def main():
    var from_values = List[Int](1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    var a_to_values = List[Int]()
    var b_to_values = List[Int]()

    @parameter
    fn a_part1() capturing -> Int:
        if len(from_values) > 0:
            a_to_values.append(from_values.pop())
            return 0
        else:
            return -1

    @parameter
    fn b_part1() capturing -> Int:
        if len(from_values) > 0:
            b_to_values.append(from_values.pop())
            return 0
        else:
            return -1

    @parameter
    fn a_part2() capturing -> Int:
        if len(a_to_values) > 0:
            print("a", a_to_values[0])
            a_to_values.clear()
        return 0

    @parameter
    fn b_part2() capturing -> Int:
        if len(b_to_values) > 0:
            print("b", b_to_values[0])
            b_to_values.clear()
        return 0

    run[a_part1, a_part2, b_part1, b_part2]()


    # without extending these variables, I get the same opaque compiler error as reported above.
    _ = a_to_values
    _ = b_to_values
    _ = from_values

sstadick avatar May 20 '25 19:05 sstadick