htslib icon indicating copy to clipboard operation
htslib copied to clipboard

Using POSIX and htslib to create x compressed vcf files using x threads

Open Npaffen opened this issue 4 months ago • 1 comments

I want to adjust the genotype of some large sample size vcf file by using global ancestry results. I have ancestry results for x origins so I want to recreate my vcf file with adjusted genotypes for those x origins. Since the filepointer in htslib are not threadsafe I used the POSIX library to create threads containing individual filepointer for each thread and then read the vcf, adjust the genotypes and write the vcf back to disk. Right now it looks like this:

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include </usr/include/htslib/hts.h>
#include </usr/include/htslib/vcf.h>


typedef struct {
  char *name;
  char code;
} Origin;


typedef struct {
  int originIndex;
  char* vcfFilename;
  htsFile *fp;
  htsFile *fp2;
  bcf_hdr_t *hdr;
  // Add other necessary fields
} ThreadData;

// Declare global variables
Origin *origins;
char ***originLines;
char **tsvLines;
int originCount, tsvLineCount;
char *outputFilename = NULL;

bcf_hdr_t *hdr2 = NULL;
int allele1;
int allele2;

char **readTsvFile(FILE *file, int *lineCount);
int readOrigins(FILE *file, Origin **origins);
void* processOrigin(void* arg);


int main(int argc, char *argv[]) {
...

 for (int i = 0; i < originCount; ++i) {
    // Allocate and prepare outputFilename
    outputFilename = malloc(strlen(origins[i].name) + strlen(argv[4]) + 10); // for "_", ".vcf.gz", and null terminator
    if (!outputFilename) {
      fprintf(stderr, "Failed to allocate memory for outputFilename\n");
      // Handle error appropriately, possibly with cleanup and exit
      exit(EXIT_FAILURE); // Consider more graceful error handling
    }
    sprintf(outputFilename, "%s_%s.vcf.gz", origins[i].name, argv[4]);
    threadData[i].originIndex = i;
    threadData[i].vcfFilename = argv[1];
    threadData[i].fp   = bcf_open(argv[1], "r");
    threadData[i].fp2  = hts_open(outputFilename, "wz");
    // Read the VCF header
    threadData[i].hdr  = bcf_hdr_read(threadData[i].fp);
    // Assuming VCF filename is first argument
    // Initialize other fields of threadData[i] as necessary
    
    if (pthread_create(&threads[i], NULL, processOrigin, &threadData[i])) {
      fprintf(stderr, "Error creating thread for origin %d\n", i);
      // Handle error
    }
  }
  
  // Wait for all threads to finish
  for (int i = 0; i < originCount; ++i) {
    pthread_join(threads[i], NULL);
  }
  ...

with processOrigin :

// process origins 
void* processOrigin(void* arg) {
  ThreadData* data = (ThreadData*)arg;
  int originIndex = data->originIndex;
  char* vcfFilename = data->vcfFilename;
  htsFile *fp = data->fp;
  htsFile *fp2 = data->fp2;
  bcf_hdr_t *hdr = data->hdr;
  if (fp == NULL) {
    fprintf(stderr, "Error opening file %s\n", vcfFilename);
    exit(EXIT_FAILURE); // Consider more graceful error handling
  }
  
  

  bcf1_t *rec = bcf_init();
  int line_count = 0;
  
  // Process the VCF records
  while (bcf_read(fp, hdr, rec) == 0) {
    if (rec->unpacked == 0) {
      bcf_unpack(rec, BCF_UN_ALL);
      int n_sample = bcf_hdr_nsamples(hdr);
      int32_t *gts = (int32_t *)malloc(n_sample * 2 * sizeof(int32_t));
      int32_t *gt_arr = NULL, ngt_arr = 0;
      int ngt = bcf_get_genotypes(hdr, rec, &gt_arr, &ngt_arr);
      const char *chrom = bcf_hdr_id2name(hdr, rec->rid);
      int pos = rec->pos + 1; // VCF positions are 1-based
      float qual = rec->qual;
      char **alleles = rec->d.allele;
      
      
      if(line_count == 0){
        
        
        if (!fp2) {
          fprintf(stderr, "Failed to open %s for writing.\n", outputFilename);
          // Handle error appropriately, including freeing outputFilename
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }
        
        // Initialize the header for the output file
        hdr2 = bcf_hdr_dup(hdr); // Duplicate the header from the input file
        if (!hdr2) {
          fprintf(stderr, "Failed to duplicate the VCF header.\n");
          // Handle error appropriately
          hts_close(fp2);
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }
        
        // Write the header to the output file
        if (bcf_hdr_write(fp2, hdr2) < 0) {
          fprintf(stderr, "Failed to write the header to %s.\n", outputFilename);
          // Handle error appropriately
          bcf_hdr_destroy(hdr2);
          hts_close(fp2);
          free(outputFilename);
          exit(EXIT_FAILURE); // Consider more graceful error handling
        }
        
      }
      
      
      int tsvcol = 0;
      for (int i = 0; i < n_sample; i++) {
        
        int32_t allele1_raw = gt_arr[i * 2];
        int32_t allele2_raw = gt_arr[i * 2 + 1];
        
        // Check if the alleles are missing directly
        bool is_allele1_missing = bcf_gt_is_missing(allele1_raw);
        bool is_allele2_missing = bcf_gt_is_missing(allele2_raw);
        
        
        if (tsvLines[line_count][tsvcol] - '0' == origins[originIndex].code && is_allele1_missing == 0) { // Adjust comparison to match types
          gts[i * 2] = bcf_gt_phased(bcf_gt_allele(allele1_raw));
        } else{
          gts[i * 2] =  bcf_gt_missing;
        }  
        if (tsvLines[line_count][tsvcol + 2] - '0' == origins[originIndex].code && is_allele2_missing == 0) {
          gts[i * 2 + 1] = bcf_gt_phased(bcf_gt_allele(allele2_raw));
        } else{
          gts[i * 2 + 1] =  bcf_gt_missing;
        }  

        tsvcol += 4;
      }
      
      bcf_update_genotypes(hdr, rec, gts, n_sample * 2); // Update the record with genotypes
      if (bcf_write(fp2, hdr2, rec) < 0) {
        fprintf(stderr, "Error writing record to VCF file.\n");
        // Handle the error, such as by cleaning up resources and exiting if necessary
      }    
      line_count++;
      if (gt_arr) free(gt_arr);
      free(gts);
    }else{
      // Clean up
      bcf_destroy(rec);
      bcf_hdr_destroy(hdr);
      if(hdr2)    bcf_hdr_destroy(hdr2);
      bcf_close(fp);
      if(fp2)    bcf_close(fp2);
    }
  }
  
  
}

The output however seems to be truncated see:

bcftools view East_Asian_chr22.vcf.gz | wc -l
[W::bcf_sr_add_reader] No BGZF EOF marker; file 'East_Asian_chr22.vcf.gz' may be truncated
[E::bgzf_read_block] Failed to read BGZF block data at offset 519085134 expected 1144 bytes; hread returned 339
[E::vcf_parse_format] Number of columns at chr22:50806710 does not match the number of samples (8011 vs 8908)
Error: VCF parse error
1498497

It looks like there is a problem with the bcf_write function or, more likely, I made a mistake in my multi-threading logic. My singel-threading code is working without problems but since the file of tsvLines is very large I would like to read the file only once then use it in each thread to save some time.

Npaffen avatar Feb 07 '24 15:02 Npaffen