1brc icon indicating copy to clipboard operation
1brc copied to clipboard

I ran your code on Dual EPYC 9354 128 threads machines

Open lehuyduc opened this issue 1 year ago • 3 comments

// https://github.com/gunnarmorling/1brc/discussions/46
#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>

// Capacity of our hashmap
// Since we use linear probing this needs to be at least twice as big
// as the # of distinct strings in our dataset
// Also must be power of 2 so we can use bit-and instead of modulo
#define HCAP (4096 * 4)
#define MAX_DISTINCT_GROUPS 16384
#define MAX_GROUPBY_KEY_LENGTH 100
#define NTHREADS 128

// branchless min/max (on some machines at least)
#define min(a, b) (a ^ ((b ^ a) & -(b < a)));
#define max(a, b) (a ^ ((a ^ b) & -(a < b)));

// parses a floating point number as an integer
// this is only possible because we know our data file has only a single decimal
static inline const char *parse_number(int *dest, const char *s) {

  // parse sign
  int mod;
  if (*s == '-') {
    mod = -1;
    s++;
  } else {
    mod = 1;
  }

  if (s[1] == '.') {
    *dest = ((s[0] * 10) + s[2] - ('0' * 11)) * mod;
    return s + 4;
  }

  *dest = (s[0] * 100 + s[1] * 10 + s[3] - '0' * 111) * mod;
  return s + 5;
}

// hash returns a simple (but fast) hash for the first n bytes of data
static unsigned int hash(const unsigned char *data, int n) {
  unsigned int hash = 0;

  for (int i = 0; i < n; i++) {
    hash = (hash * 31) + data[i];
  }

  return hash;
}

struct Group {
  unsigned int count;
  long sum;
  int min;
  int max;
  char *label;
};

struct Result {
  int map[HCAP];
  int n;
  char labels[MAX_DISTINCT_GROUPS][MAX_GROUPBY_KEY_LENGTH];
  struct Group groups[MAX_DISTINCT_GROUPS];
};

struct Chunk {
  size_t start;
  size_t end;
  const char *data;
};

// qsort callback
static int cmp(const void *ptr_a, const void *ptr_b) {
  return strcmp(((struct Group *)ptr_a)->label, ((struct Group *)ptr_b)->label);
}

static inline unsigned int
hash_probe(int map[HCAP],
           char groups[MAX_DISTINCT_GROUPS][MAX_GROUPBY_KEY_LENGTH],
           const char *start, int len) {
  // probe map until free spot or match
  unsigned int h = hash((unsigned char *)start, len) & (HCAP - 1);
  while (map[h] >= 0 && memcmp(groups[map[h]], start, (size_t)len) != 0) {
    h = (h + 1) & (HCAP - 1);
  }

  return h;
}

static void *process_chunk(void *ptr) {
  struct Chunk *ch = (struct Chunk *)ptr;

  // skip start forward until SOF or after next newline
  if (ch->start > 0) {
    while (ch->data[ch->start - 1] != '\n') {
      ch->start++;
    }
  }

  while (ch->data[ch->end] != 0x0 && ch->data[ch->end - 1] != '\n') {
    ch->end++;
  }

  struct Result *result = malloc(sizeof(*result));
  if (!result) {
    perror("malloc error");
    exit(EXIT_FAILURE);
  }
  result->n = 0;
  memset(result->labels, 0,
         MAX_DISTINCT_GROUPS * MAX_GROUPBY_KEY_LENGTH * sizeof(char));
  memset(result->map, -1, HCAP * sizeof(int));

  const char *s = &ch->data[ch->start];
  const char *end = &ch->data[ch->end];
  const char *linestart;
  unsigned int h;
  int temperature;
  int len;
  int c;

  while (s != end) {
    linestart = s;

    // hash everything up to ';'
    // assumption: key is at least 1 char
    len = 1;
    h = (unsigned char)s[0];
    while (s[len] != ';') {
      h = (h * 31) + (unsigned char)s[len++];
    }

    // parse decimal number as int
    s = parse_number(&temperature, s + len + 1);

    // probe map until free spot or match
    h = h & (HCAP - 1);
    while (result->map[h] >= 0 && memcmp(result->labels[result->map[h]],
                                         linestart, (size_t)len) != 0) {
      h = (h + 1) & (HCAP - 1);
    }
    c = result->map[h];

    if (c < 0) {
      memcpy(result->labels[result->n], linestart, (size_t)len);
      result->labels[result->n][len] = 0x0;
      result->groups[result->n].label = result->labels[result->n];
      result->groups[result->n].count = 1;
      result->groups[result->n].sum = temperature;
      result->groups[result->n].min = temperature;
      result->groups[result->n].max = temperature;
      result->map[h] = result->n++;
    } else {
      result->groups[c].count += 1;
      result->groups[c].sum += temperature;
      result->groups[c].min = min(result->groups[c].min, temperature);
      result->groups[c].max = max(result->groups[c].max, temperature);
    }
  }

  return (void *)result;
}

void result_to_str(char *dest, const struct Result *result) {
  char buf[128];
  *dest++ = '{';
  for (int i = 0; i < result->n; i++) {
    size_t n = (size_t)sprintf(
        buf, "%s=%.1f/%.1f/%.1f", result->groups[i].label,
        (float)result->groups[i].min / 10.0,
        ((float)result->groups[i].sum / (float)result->groups[i].count) / 10.0,
        (float)result->groups[i].max / 10.0);

    memcpy(dest, buf, n);
    if (i < result->n - 1) {
      memcpy(dest + n, ", ", 2);
      n += 2;
    }

    dest += n;
  }
  *dest++ = '}';
  *dest = 0x0;
}

int main(int argc, char **argv) {
  struct timespec start_time, end_time;
  clock_gettime(CLOCK_MONOTONIC, &start_time);

  char *file = "measurements.txt";
  if (argc > 1) {
    file = argv[1];
  }

  int fd = open(file, O_RDONLY);
  if (!fd) {
    perror("error opening file");
    exit(EXIT_FAILURE);
  }

  struct stat sb;
  if (fstat(fd, &sb) == -1) {
    perror("error getting file size");
    exit(EXIT_FAILURE);
  }

  // mmap entire file into memory
  size_t sz = (size_t)sb.st_size;
  const char *data = mmap(NULL, sz, PROT_READ, MAP_PRIVATE, fd, 0);
  if (data == MAP_FAILED) {
    perror("error mmapping file");
    exit(EXIT_FAILURE);
  }

  // distribute work among N worker threads
  pthread_t workers[NTHREADS];
  struct Chunk chunks[NTHREADS];
  size_t chunk_size = sz / (size_t)NTHREADS;
  for (int i = 0; i < NTHREADS; i++) {
    chunks[i].data = data;
    chunks[i].start = chunk_size * (size_t)i;
    chunks[i].end = chunk_size * ((size_t)i + 1);
    pthread_create(&workers[i], NULL, process_chunk, &chunks[i]);
  }

  // wait for all threads to finish
  struct Result *results[NTHREADS];
  for (int i = 0; i < NTHREADS; i++) {
    pthread_join(workers[i], (void *)&results[i]);
  }

  // merge results
  char *label;
  struct Group *b;
  unsigned int h;
  int c;
  struct Result *result = results[0];
  for (int i = 1; i < NTHREADS; i++) {
    for (int j = 0; j < results[i]->n; j++) {
      b = &results[i]->groups[j];
      label = results[i]->labels[j];
      h = hash_probe(result->map, result->labels, label, (int)strlen(label));

      // TODO: Refactor lines below, we can share some logic with process_chunk
      c = result->map[h];
      if (c >= 0) {
        result->groups[c].count += b->count;
        result->groups[c].sum += b->sum;
        result->groups[c].min = min(result->groups[c].min, b->min);
        result->groups[c].max = max(result->groups[c].max, b->max);
      } else {
        // memcpy(&result->groups[result->n], b, sizeof(*b));
        strcpy(result->labels[result->n], label);
        result->groups[result->n].count = b->count;
        result->groups[result->n].sum = b->sum;
        result->groups[result->n].min = b->min;
        result->groups[result->n].max = b->max;
        result->groups[result->n].label = result->labels[result->n];
        result->map[h] = result->n++;
      }
    }
  }

  // sort results alphabetically
  qsort(result->groups, (size_t)result->n, sizeof(struct Group), cmp);

  // prepare output string
  char buf[(1 << 10) * 16];
  result_to_str(buf, result);
  puts(buf);

  clock_gettime(CLOCK_MONOTONIC, &end_time);
  double elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;

  printf("Runtime inside main = %fms\n", elapsed_time);

  // // clean-up

  clock_gettime(CLOCK_MONOTONIC, &start_time);
  munmap(data, sz);
  clock_gettime(CLOCK_MONOTONIC, &end_time);
  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("munmap cost = %fms\n", elapsed_time);

  clock_gettime(CLOCK_MONOTONIC, &start_time);
  close(fd);
  for (int i = 0; i < NTHREADS; i++) {
    free(results[i]);
  }
  clock_gettime(CLOCK_MONOTONIC, &end_time);

  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("free memory cost = %fms\n", elapsed_time);
  
  // exit(EXIT_SUCCESS);
}

// Runtime inside main = 343.277755ms
// munmap cost = 216.992869ms
// free memory cost = 19.699456ms
// real    0m0.582s
// user    0m29.590s
// sys     0m2.204s

// Dual EPYC 9354 128 threads
// ```
// Runtime inside main = 343.277755ms
// munmap cost = 216.992869ms
// free memory cost = 19.699456ms
// real    0m0.582s
// user    0m29.590s
// sys     0m2.204s
// ```

// AMD 2950X, 32 threads
// ```
// Runtime inside main = 979.854644ms
// munmap cost = 152.057272ms
// free memory cost = 3.055444ms
// real    0m1.137s
// user    0m28.855s
// sys     0m0.734s
// ```

// AMD 2950X, 1 thread
// ```
// Runtime inside main = 18154.117726ms
// munmap cost = 156.046306ms
// free memory cost = 0.126220ms
// real    0m18.312s
// user    0m17.956s
// sys     0m0.348s
// ```

lehuyduc avatar Jan 08 '24 09:01 lehuyduc

Ran your latest code again! On Threadripper 5995WX this time. Result at the end

#include <fcntl.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
#include <time.h>

#define MAX_DISTINCT_GROUPS 10000
#define MAX_GROUPBY_KEY_LENGTH 100
#define HCAP (1 << 14)

#ifndef NTHREADS
#define NTHREADS 32
#endif

// branchless min/max (on some machines at least)
#define min(a, b) (a ^ ((b ^ a) & -(b < a)));
#define max(a, b) (a ^ ((a ^ b) & -(a < b)));

struct Group {
  unsigned int count;
  long sum;
  int min;
  int max;
  char key[MAX_GROUPBY_KEY_LENGTH];
};

struct Result {
  int map[HCAP];
  unsigned int hashes[HCAP];
  int n;
  struct Group groups[MAX_DISTINCT_GROUPS];
};

struct Chunk {
  size_t start;
  size_t end;
  const char *data;
};

// parses a floating point number as an integer
// this is only possible because we know our data file has only a single decimal
static inline const char *parse_number(int *dest, const char *s) {

  // parse sign
  int mod;
  if (*s == '-') {
    mod = -1;
    s++;
  } else {
    mod = 1;
  }

  if (s[1] == '.') {
    *dest = ((s[0] * 10) + s[2] - ('0' * 11)) * mod;
    return s + 4;
  }

  *dest = (s[0] * 100 + s[1] * 10 + s[3] - ('0' * 111)) * mod;
  return s + 5;
}

// qsort callback
static int cmp(const void *ptr_a, const void *ptr_b) {
  return strcmp(((struct Group *)ptr_a)->key, ((struct Group *)ptr_b)->key);
}

// finds hash slot in map of key
static inline unsigned int hash_probe(struct Result *result, const char *key) {

  // hash key
  unsigned int h = (unsigned char)key[0];
  unsigned int len = 1;
  for (; key[len] != 0x0; len++) {
    h = (h * 31) + (unsigned char)key[len];
  }

  // linearly probe hashmap until match OR free spot
  while (result->hashes[h & (HCAP - 1)] != 0 &&
         h != result->hashes[h & (HCAP - 1)]) {
    h++;
  }

  return h;
}

static void *process_chunk(void *ptr) {
  struct Chunk *ch = (struct Chunk *)ptr;

  // skip start forward until SOF or after next newline
  if (ch->start > 0) {
    while (ch->data[ch->start - 1] != '\n') {
      ch->start++;
    }
  }

  while (ch->data[ch->end] != 0x0 && ch->data[ch->end - 1] != '\n') {
    ch->end++;
  }

  struct Result *result = malloc(sizeof(*result));
  if (!result) {
    perror("malloc error");
    exit(EXIT_FAILURE);
  }
  result->n = 0;

  memset(result->hashes, 0, HCAP * sizeof(int));
  memset(result->map, -1, HCAP * sizeof(int));

  const char *s = &ch->data[ch->start];
  const char *end = &ch->data[ch->end];
  const char *linestart;
  unsigned int h;
  int temperature;
  int len;
  int c;

  while (s != end) {
    linestart = s;

    // hash everything up to ';'
    // assumption: key is at least 1 char
    len = 1;
    h = (unsigned char)s[0];
    while (s[len] != ';') {
      h = (h * 31) + (unsigned char)s[len++];
    }

    // parse decimal number as int
    s = parse_number(&temperature, s + len + 1);

    // probe map until free spot or match
    while (result->hashes[h & (HCAP - 1)] != 0 &&
           h != result->hashes[h & (HCAP - 1)]) {
      h++;
    }
    c = result->map[h & (HCAP - 1)];

    if (c < 0) {
      memcpy(result->groups[result->n].key, linestart, (size_t)len);
      result->groups[result->n].key[len] = 0x0;
      result->groups[result->n].count = 1;
      result->groups[result->n].sum = temperature;
      result->groups[result->n].min = temperature;
      result->groups[result->n].max = temperature;
      result->map[h & (HCAP - 1)] = result->n++;
      result->hashes[h & (HCAP - 1)] = h;
    } else {
      result->groups[c].count += 1;
      result->groups[c].sum += temperature;
      if (temperature < result->groups[c].min) {
        result->groups[c].min = temperature;
      } else if (temperature > result->groups[c].max) {
        result->groups[c].max = temperature;
      }
    }
  }

  return (void *)result;
}

void result_to_str(char *dest, const struct Result *result) {
  char buf[128];
  *dest++ = '{';
  for (int i = 0; i < result->n; i++) {
    size_t n = (size_t)sprintf(
        buf, "%s=%.1f/%.1f/%.1f", result->groups[i].key,
        (float)result->groups[i].min / 10.0,
        ((float)result->groups[i].sum / (float)result->groups[i].count) / 10.0,
        (float)result->groups[i].max / 10.0);

    memcpy(dest, buf, n);
    if (i < result->n - 1) {
      memcpy(dest + n, ", ", 2);
      n += 2;
    }

    dest += n;
  }
  *dest++ = '}';
  *dest = 0x0;
}

int main(int argc, char **argv) {
  struct timespec start_time, end_time;
  clock_gettime(CLOCK_MONOTONIC, &start_time);

  char *file = "measurements.txt";
  if (argc > 1) {
    file = argv[1];
  }

  int fd = open(file, O_RDONLY);
  if (!fd) {
    perror("error opening file");
    exit(EXIT_FAILURE);
  }

  struct stat sb;
  if (fstat(fd, &sb) == -1) {
    perror("error getting file size");
    exit(EXIT_FAILURE);
  }

  // mmap entire file into memory
  size_t sz = (size_t)sb.st_size;
  const char *data = mmap(NULL, sz, PROT_READ, MAP_SHARED, fd, 0);
  if (data == MAP_FAILED) {
    perror("error mmapping file");
    exit(EXIT_FAILURE);
  }

  // distribute work among N worker threads
  pthread_t workers[NTHREADS];
  struct Chunk chunks[NTHREADS];
  size_t chunk_size = sz / (size_t)NTHREADS;
  for (int i = 0; i < NTHREADS; i++) {
    chunks[i].data = data;
    chunks[i].start = chunk_size * (size_t)i;
    chunks[i].end = chunk_size * ((size_t)i + 1);
    pthread_create(&workers[i], NULL, process_chunk, &chunks[i]);
  }

  // wait for all threads to finish
  struct Result *results[NTHREADS];
  for (int i = 0; i < NTHREADS; i++) {
    pthread_join(workers[i], (void *)&results[i]);
  }

  // merge results
  struct Group *b;
  unsigned int h;
  int c;
  struct Result *result = results[0];
  for (int i = 1; i < NTHREADS; i++) {
    for (int j = 0; j < results[i]->n; j++) {
      b = &results[i]->groups[j];
      h = hash_probe(result, b->key);

      // TODO: Refactor lines below, we can share some logic with process_chunk
      c = result->map[h & (HCAP - 1)];
      if (c >= 0) {
        result->groups[c].count += b->count;
        result->groups[c].sum += b->sum;
        result->groups[c].min = min(result->groups[c].min, b->min);
        result->groups[c].max = max(result->groups[c].max, b->max);
      } else {
        strcpy(result->groups[result->n].key, b->key);
        result->groups[result->n].count = b->count;
        result->groups[result->n].sum = b->sum;
        result->groups[result->n].min = b->min;
        result->groups[result->n].max = b->max;
        result->map[h & (HCAP - 1)] = result->n++;
        result->hashes[h & (HCAP - 1)] = h;
      }
    }
  }

  // sort results alphabetically
  qsort(result->groups, (size_t)result->n, sizeof(struct Group), cmp);

  // prepare output string
  char buf[(1 << 10) * 16];
  result_to_str(buf, result);
  puts(buf);

  clock_gettime(CLOCK_MONOTONIC, &end_time);
  double elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;

  printf("Runtime inside main = %fms\n", elapsed_time);

  clock_gettime(CLOCK_MONOTONIC, &start_time);
  munmap((void *)data, sz);
  clock_gettime(CLOCK_MONOTONIC, &end_time);
  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("munmap cost = %fms\n", elapsed_time);

  // clean-up
  clock_gettime(CLOCK_MONOTONIC, &start_time);
  close(fd);
  for (int i = 0; i < NTHREADS; i++) {
    free(results[i]);
  }
  clock_gettime(CLOCK_MONOTONIC, &end_time);
  elapsed_time = (end_time.tv_sec - start_time.tv_sec) * 1000.0 +
                          (end_time.tv_nsec - start_time.tv_nsec) / 1000000.0;                          
  printf("free memory cost = %fms\n", elapsed_time);

  exit(EXIT_SUCCESS);
}

// 5995 WX 128 threads
// Runtime inside main = 256.936238ms
// munmap cost = 188.615260ms
// free memory cost = 1.810737ms
// real    0m0.449s
// user    0m26.775s
// sys     0m0.901s

// Runtime inside main = 235.476961ms
// munmap cost = 187.020981ms
// free memory cost = 1.696442ms
// real    0m0.426s
// user    0m26.654s
// sys     0m0.851s

// Runtime inside main = 241.524251ms
// munmap cost = 190.864014ms
// free memory cost = 1.763929ms
// real    0m0.436s
// user    0m27.065s
// sys     0m0.879s

// 2950X
// 32 thread
// Runtime inside main = 796.276934ms
// munmap cost = 155.028533ms
// free memory cost = 0.499529ms

// real	0m0.954s
// user	0m23.244s
// sys	0m0.679s

// 8
// Runtime inside main = 2008.665137ms
// munmap cost = 151.587516ms
// free memory cost = 0.141048ms
// real	0m2.162s
// user	0m15.489s
// sys	0m0.496s

// 1
// Runtime inside main = 15068.932848ms
// munmap cost = 152.826194ms
// free memory cost = 0.031580ms
// real	0m15.223s
// user	0m14.759s
// sys	0m0.456s

lehuyduc avatar Jan 10 '24 10:01 lehuyduc

Whoa @lehuyduc - thanks again. Didn't except such a big improvement from those last few micro-optimizations.

dannyvankooten avatar Jan 10 '24 10:01 dannyvankooten

Didn't except such a big improvement from those last few micro-optimizations.

Same. A few days ago I thought I've squeezed out everything, but slowly my final time went from 0.22 -> 0.18x -> 0.17x -> 0.16x -> ... -> 0.151s :D

lehuyduc avatar Jan 10 '24 10:01 lehuyduc