Allowing for multiple CPU's to work on extraction, idea for the project.
Hey there!
I have a suggestion and a PoC / example rewrite that may really help boost performance, note, it's not battle tested, yet may give you some ideas. My challenge is that it's CPU bound, and while my disk is rather bored, the extraction for large files is taking quite some time. Here's what I've put together, love to know your thoughts.
package xtractr
import (
"fmt"
"io"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/bodgit/sevenzip"
)
// --- Placeholder structs and methods for context ---
// XFile holds the configuration and state for an extraction operation.
// A Mutex is added to handle concurrent filesystem access safely.
type XFile struct {
FilePath string
OutputDir string
FileMode os.FileMode
DirMode os.FileMode
Password string
Passwords []string
mu sync.Mutex // Protects filesystem operations
}
// file represents a single file to be written to disk.
type file struct {
Path string
Data io.ReadCloser
FileMode os.FileMode
DirMode os.FileMode
Mtime time.Time
Atime time.Time
}
// ErrInvalidPath is returned when a file path is outside the output directory.
var ErrInvalidPath = fmt.Errorf("file path is invalid")
// Debugf is a placeholder for a logging function.
func (x *XFile) Debugf(format string, v ...interface{}) {
// In a real application, this would write to a logger.
// For this example, we can leave it empty or print to stdout.
// fmt.Printf(format+"\n", v...)
}
// clean is a placeholder for a path cleaning function.
func (x *XFile) clean(path string) string {
return filepath.Join(x.OutputDir, filepath.Clean("/"+path))
}
// mkDir is a placeholder for a directory creation function.
func (x *XFile) mkDir(path string, mode os.FileMode, mtime time.Time) error {
return os.MkdirAll(path, mode)
}
// write is a placeholder for a file writing function.
func (x *XFile) write(f *file) (int64, error) {
if err := os.MkdirAll(filepath.Dir(f.Path), x.DirMode); err != nil {
return 0, fmt.Errorf("os.MkdirAll: %w", err)
}
outFile, err := os.Create(f.Path)
if err != nil {
return 0, fmt.Errorf("os.Create: %w", err)
}
defer outFile.Close()
return io.Copy(outFile, f.Data)
}
// cleanup is a placeholder for a cleanup function.
func (x *XFile) cleanup(files []string) ([]string, error) {
// No-op for this example
return files, nil
}
// --- Main extraction logic ---
// result is a struct to hold the outcome of a single file extraction.
type result struct {
size int64
file string
err error
}
// Extract7z extracts a 7zip archive.
// Volumes: https://github.com/bodgit/sevenzip/issues/54
func Extract7z(xFile *XFile) (size int64, filesList, archiveList []string, err error) {
if len(xFile.Passwords) == 0 && xFile.Password == "" {
return extract7z(xFile)
}
// Try all the passwords. This part remains sequential because you can't
// know the correct password until you try one successfully.
passwords := xFile.Passwords
if xFile.Password != "" { // If a single password is provided, try it first.
passwords = append([]string{xFile.Password}, xFile.Passwords...)
}
for idx, password := range passwords {
size, files, archives, err := extract7z(&XFile{
FilePath: xFile.FilePath,
OutputDir: xFile.OutputDir,
FileMode: xFile.FileMode,
DirMode: xFile.DirMode,
Password: password,
})
if err != nil && idx == len(passwords)-1 {
return size, files, archives, fmt.Errorf("used password %d of %d: %w", idx+1, len(passwords), err)
} else if err == nil {
return size, files, archives, nil
}
}
// unreachable code
return 0, nil, nil, nil
}
// extract7z now orchestrates the concurrent extraction.
func extract7z(xFile *XFile) (int64, []string, []string, error) {
var (
sevenZip *sevenzip.ReadCloser
err error
)
if xFile.Password != "" {
sevenZip, err = sevenzip.OpenReaderWithPassword(xFile.FilePath, xFile.Password)
} else {
sevenZip, err = sevenzip.OpenReader(xFile.FilePath)
}
if err != nil {
return 0, nil, nil, fmt.Errorf("%s: os.Open: %w", xFile.FilePath, err)
}
defer sevenZip.Close()
// Determine the number of workers. A good default is the number of CPU cores.
numWorkers := runtime.NumCPU()
if numWorkers > len(sevenZip.File) {
// Don't spin up more workers than files.
numWorkers = len(sevenZip.File)
}
// Create channels for jobs and results.
jobs := make(chan *sevenzip.File, len(sevenZip.File))
results := make(chan result, len(sevenZip.File))
// A WaitGroup is used to wait for all workers to finish.
var wg sync.WaitGroup
// Start the workers.
for w := 0; w < numWorkers; w++ {
wg.Add(1)
go worker(w, xFile, &wg, jobs, results)
}
// Send all the files to be extracted to the jobs channel.
for _, zipFile := range sevenZip.File {
jobs <- zipFile
}
close(jobs)
// Wait for all workers to complete their tasks.
wg.Wait()
close(results)
// Collect the results from the results channel.
var (
totalSize int64
files []string
firstErr error
)
for res := range results {
if res.err != nil {
if firstErr == nil {
firstErr = res.err
}
xFile.Debugf("Error extracting file: %s, error: %v", res.file, res.err)
continue
}
if res.file != "" { // Only append if a file was actually written
files = append(files, res.file)
}
totalSize += res.size
}
if firstErr != nil {
return totalSize, files, sevenZip.Volumes(), firstErr
}
files, err = xFile.cleanup(files)
return totalSize, files, sevenZip.Volumes(), err
}
// worker is the function that each goroutine will run.
func worker(id int, xFile *XFile, wg *sync.WaitGroup, jobs <-chan *sevenzip.File, results chan<- result) {
defer wg.Done()
for zipFile := range jobs {
xFile.Debugf("Worker %d processing file: %s", id, zipFile.Name)
fSize, wfile, err := xFile.un7zip(zipFile)
// Determine the full path for the result, even if there was an error.
fullPath := filepath.Join(xFile.OutputDir, zipFile.Name)
if wfile != "" && err == nil {
fullPath = wfile
}
results <- result{
size: fSize,
file: fullPath,
err: err,
}
}
}
// un7zip handles the extraction of a single file and is now thread-safe.
func (x *XFile) un7zip(zipFile *sevenzip.File) (int64, string, error) {
zFile, err := zipFile.Open()
if err != nil {
return 0, "", fmt.Errorf("zipFile.Open: %w", err)
}
defer zFile.Close()
file := &file{
Path: x.clean(zipFile.Name),
Data: zFile,
FileMode: zipFile.Mode(),
DirMode: x.DirMode,
Mtime: zipFile.Modified,
Atime: zipFile.Accessed,
}
if !strings.HasPrefix(file.Path, x.OutputDir) {
err := fmt.Errorf("%s: %w: %s (from: %s)", zipFile.FileInfo().Name(), ErrInvalidPath, file.Path, zipFile.Name)
return 0, file.Path, err
}
// --- THREAD SAFETY FIX ---
// Lock the mutex before touching the filesystem to prevent race conditions.
x.mu.Lock()
defer x.mu.Unlock() // Unlock when the function returns.
if zipFile.FileInfo().IsDir() {
x.Debugf("Writing archived directory: %s", file.Path)
if err := x.mkDir(file.Path, zipFile.Mode(), zipFile.Modified); err != nil {
return 0, file.Path, fmt.Errorf("making zipFile dir: %w", err)
}
// Return the path but 0 size for directories.
return 0, file.Path, nil
}
x.Debugf("Writing archived file: %s (packed: %d, unpacked: %d)",
file.Path, zipFile.FileInfo().Size(), zipFile.UncompressedSize)
s, err := x.write(file)
if err != nil {
return s, file.Path, fmt.Errorf("%s: %w: %s (from: %s)", zipFile.FileInfo().Name(), err, file.Path, zipFile.Name)
}
return s, file.Path, nil
}
Could be extrapolated to all of the different extraction techniques. Needs to be tested, yet hope it helps the project out.
Best!
Hello! Thanks for putting time into improving this library. if you submit this as a pull request it'll make it a lot easier for me to find your changes versus what was already there. Can you do that?
And tell me what this actually improves? Extracting one large 7z file is made faster with this change? Or are we just parallelizing multiple files?