log icon indicating copy to clipboard operation
log copied to clipboard

File rotation limitations

Open softexpert opened this issue 11 months ago • 3 comments

Hello phuslu,

You have added a very nice set of features in the library; there are a few limitations related to file rotation that give me currently some trouble:

  1. the rotation logic assumes that only gzip is used. However, since you exposed an important part of the logic in the Cleaner attribute, you should expose also the filtering of the files so that different compression formats can be used. I could, for example, compress as ZSTD, or 7Z, or LZ4 and I would need the corresponding extensions to be evaluated by the Rotate method.

  2. the rotation logic assumes that the code will be executed on Linux - and it has some specific logic (for example retrieving the user and group IDs). But the same is needed in Windows context, and there should be a Windows specific logic as well.

Perhaps the best would be to declare a default rotation handler per platform and also expose some variables in the called method:

  • the filter to be used for the files names
  • the enumeration of the files in the log folder

I hope my explanations did not create confusion.

Do you think it is complicated to implement ?

Thank you in advance for your feedback !

softexpert avatar Jan 10 '25 17:01 softexpert

compress as ZSTD, or 7Z, or LZ4 and I would need the corresponding extensions to be evaluated by the Rotate method.

Agree, let me try fix that limitation. (described below)

for example retrieving the user and group IDs

Currently I think it is a bonus for "sudo ./program" running. I don't think we need add similar things to other platform(i.e. windows)

the filter to be used for the files names

First I'd like to add/expose a field in FileWriter (maybe) named CompressSuffixs []string, uses []string{".gz", ".bz2", ".zstd", "lz4"} as default. Let me implement this way first, because I think exposing a value field instead of a callback/function field, makes users feel calm

the enumeration of the files in the log folder

I think it maybe go a bit far. In this case, people just copy file a new file.go then do they self modification. It maybe lost the "AsyncWriter" ability, I will fix it issue first.

Overall, thanks for you point this, it let me recognize that the filewriter of this lib still need to improve.

phuslu avatar Jan 11 '25 02:01 phuslu

Thank you for the quick response, phuslu! Since there are major difference in how compression commands are available (sometimes you need to declare specific executable paths in Windows) and how the rights are handled, it definitely requires conditional compilation and platform-specific code. Another aspect is that, no matter how rich is the logic implemented, there might be use cases that cannot benefit from it (what if I want to rotate at 10:00 AM the log and push it as LZ4 to a folder and rotate it again at 17:30 and push it as 7Z on another folder ?). It would probably be best to have a callback option as well, besides the standard method, so that the users can implement their own logic.

It definitely requires some reflection, since it can become quite complex ... On my side I ended up making a helper that I call based on a scheduler, for now.

softexpert avatar Jan 11 '25 05:01 softexpert

If it helps, here is the core of my log compression approach; utils.SuspendableTicker is just a fancy ticker that can be paused and restarted.

package logging

import (
	"FA/scanxp/utils"
	"context"
	"os"
	"os/exec"
	"path/filepath"
	"strings"
	"sync"
	"time"
)

type LogCompressor struct {
	cType CompressorType
	cPath string // relative or absolute path of the compression app
	cOK   bool   // flag telling that compressor can work

	processedFolder string          // folder that will be inspected
	logExtFilter    map[string]bool // extensions to be accepted / disregarded

	patAccept string //pattern to accept files
	patIgnore string //pattern to ignore files

	compressOlderThan         time.Duration // defaults to 1 min; recently modified files could still be written to so it is best to skip them
	removeOlderThan           time.Duration // defaults to 10 days; compressed files older than this will be deleted
	compressSmallAndOlderThan time.Duration // defaults to 15 min; recently modified files could still be written to so it is best to skip them

	minSizeToCompress int64

	ctxLC   context.Context
	cancelF context.CancelFunc

	sched *utils.SuspendableTicker
}

type CompressorType uint

// also take a look at https://github.com/mholt/archives for decompression
const (
	CmpLZ4  CompressorType = iota // in Windows needs https://github.com/lz4/lz4/releases
	Cmp7Z                         // in Windows needs https://www.7-zip.org/ or https://github.com/mcmilk/7-Zip-zstd/releases
	CmpZSTD                       // in Windows needs https://github.com/facebook/zstd/releases
	CmpGZ                         // in Windows needs https://sourceforge.net/projects/gnuwin32/files/gzip/
)

var (
	cmp4Logs           *LogCompressor
	onceInitCompressor sync.Once
)

func GetCompressor(cType CompressorType, useRelPath bool) *LogCompressor {
	onceInitCompressor.Do(func() {
		initCompressor(cType, useRelPath)
	})
	return cmp4Logs
}

func (lc *LogCompressor) WithContext(ctx context.Context) *LogCompressor {
	lc.ctxLC = ctx
	return lc
}

func (lc *LogCompressor) WithContextAndCancel(ctx context.Context, cF context.CancelFunc) *LogCompressor {
	lc.ctxLC = ctx
	lc.cancelF = cF
	return lc
}

func (lc *LogCompressor) WithLogFolder(fldr string) *LogCompressor {
	lc.processedFolder = fldr
	return lc
}

func (lc *LogCompressor) WithAcceptableLogFilters(fltrs ...string) *LogCompressor {
	if lc.logExtFilter == nil {
		lc.logExtFilter = map[string]bool{}
	}
	for _, v := range fltrs {
		lc.logExtFilter[v] = true
	}
	return lc
}

func (lc *LogCompressor) WithUnacceptableLogFilters(fltrs ...string) *LogCompressor {
	if lc.logExtFilter == nil {
		lc.logExtFilter = map[string]bool{}
	}
	for _, v := range fltrs {
		lc.logExtFilter[v] = false
	}
	return lc
}

func (lc *LogCompressor) WithPatternFilters(accept string, ignore string) *LogCompressor {
	lc.patAccept = accept
	lc.patIgnore = ignore
	return lc
}

// WithDurations
func (lc *LogCompressor) WithDurations(minAge4Compress time.Duration, maxAge4Keep time.Duration, minAgeForceCompress time.Duration) *LogCompressor {
	lc.compressOlderThan = minAge4Compress
	lc.removeOlderThan = maxAge4Keep
	if minAgeForceCompress > 0 && minAgeForceCompress < time.Minute*15 {
		lc.compressSmallAndOlderThan = time.Minute * 15
	} else {
		lc.compressSmallAndOlderThan = minAgeForceCompress
	}

	return lc
}

func (lc *LogCompressor) WithSizeCriteria(minSize int64) *LogCompressor {
	lc.minSizeToCompress = minSize
	return lc
}

func (lc *LogCompressor) DisableScheduler() *LogCompressor {
	lc.cancelF()
	lc.sched.Stop()
	return lc
}

func (lc *LogCompressor) EnableScheduler(freq time.Duration) *LogCompressor {
	// cannot accept less than 5 min between cycles
	if freq < (time.Minute * 5) {
		freq = time.Minute * 5
	}
	lc.sched = utils.NewSuspendableTicker(lc.ctxLC, freq, true)
	go func() {
		for {
			select {
			case <-lc.ctxLC.Done():
				lc.sched.Stop()
				if lc.cancelF != nil { // signals that the processing needs to stop
					lc.cancelF()
				}
				return
			case _ = <-lc.sched.TC:
				lc.sched.Suspend()
				lc.ProcessLogFiles(1)
				time.Sleep(time.Second * 10) // give it time to settle
				lc.sched.Resume()
			}
		}
	}()
	lc.sched.Resume()
	return lc
}

func initCompressor(cType CompressorType, relPath bool) {
	var (
		exePath string
		err     error
	)

	cmp4Logs = &LogCompressor{
		cType:             cType,
		cOK:               true,
		compressOlderThan: time.Minute,     // by default 1 minute; files younger than this will be left to settle
		removeOlderThan:   time.Hour * 240, // by default, compressed log files older than 10 days are removed
	}

	switch cType {
	case CmpLZ4:
		cmp4Logs.cPath = "lz4"
	case Cmp7Z:
		cmp4Logs.cPath = "7z"
	case CmpZSTD:
		cmp4Logs.cPath = "zstd"
	case CmpGZ:
		cmp4Logs.cPath = "gzip"
	}
	if !relPath {
		exePath, err = os.Executable()
		cmp4Logs.cPath, err = filepath.Abs(filepath.Join(filepath.Dir(exePath), cmp4Logs.cPath))
		if err != nil {
			cmp4Logs.cOK = false
		}
	}
}

func (c LogCompressor) Compress(fn string, keepOldFile bool) (err error) {
	if c.cOK {
		var wg sync.WaitGroup = sync.WaitGroup{}
		wg.Add(1)
		switch c.cType {
		case CmpLZ4:
			if keepOldFile {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-12", "-f", fn, fn+`.lz4`).Run()
				}()
			} else {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-12", "-f", "--rm", fn, fn+`.lz4`).Run()
				}()
			}
		case Cmp7Z:
			if keepOldFile {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "a", fn+`.7z`, fn).Run()
				}()
			} else {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "a", "-sdel", fn+`.7z`, fn).Run()
				}()
			}
		case CmpZSTD:
			if keepOldFile {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-12", "-k", "-f", fn, "-o", fn+`.zstd`).Run()
				}()
			} else {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-12", "--rm", "-f", fn, "-o", fn+`.zstd`).Run()
				}()
			}
		case CmpGZ:
			if keepOldFile {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-9", "-k", "-f", fn).Run()
				}()
			} else {
				go func() {
					defer wg.Done()
					err = exec.Command(c.cPath, "-9", "-f", fn).Run()
				}()
			}
		}
		wg.Wait()
	}
	return err
}

// https://towardsdev.com/golang-producer-consumer-from-naive-to-almost-production-ready-solution-fc7be66ad71c

func (lc *LogCompressor) ProcessLogFiles(nbWorkers int) {
	var (
		cExt     string
		dataChan chan string
	)
	switch lc.cType {
	case CmpLZ4:
		cExt = ".lz4"
	case Cmp7Z:
		cExt = ".7z"
	case CmpZSTD:
		cExt = ".zstd"
	case CmpGZ:
		cExt = ".gz"
	}

	lc.logExtFilter[cExt] = false // by default, exclude the compressor's corresponding extension - we do not want to compress again the same files

	consumersClosedChan := make(chan struct{})
	dataChan = lc.retrieveFiles(nbWorkers, cExt)

	go lc.runConsumers(nbWorkers, dataChan, consumersClosedChan)

	<-consumersClosedChan
}

func (lc LogCompressor) runConsumers(NumOfConsumers int, dataChan chan string, consumersClosedChan chan struct{}) {
	defer close(consumersClosedChan)

	var wg sync.WaitGroup
	wg.Add(NumOfConsumers)
	for i := 0; i < NumOfConsumers; i++ {
		go func() {
			for {
				select {
				case <-lc.ctxLC.Done():
					return
				case fn, ok := <-dataChan:
					if !ok {
						// ending consumer
						wg.Done()
						return
					}
					lc.Compress(fn, false)
				}
			}
		}()
	}
	wg.Wait()
}

// retrieveFiles produces a list of files to be compressed
func (lc LogCompressor) retrieveFiles(nbWorkers int, cExt string) (dataChan chan string) {
	dataChan = make(chan string, nbWorkers)

	go func(dch chan string, cext string) {
		defer close(dch)

		var (
			ext                  string
			iFound               int
			b                    bool
			acc                  bool
			fn                   string
			bUseAcc              bool = (len(lc.patAccept) > 0)    // use accept pattern
			bUseIgn              bool = (len(lc.patIgnore) > 0)    // use ignore pattern
			bUseMap              bool = (len(lc.logExtFilter) > 0) // use extensions collection map
			farr                 []os.DirEntry
			err                  error
			bRemoveOlder         bool = (lc.removeOlderThan > time.Hour) // enable file removing if interval is set to at least one hour
			bCompressSmallAndOld bool = (lc.compressSmallAndOlderThan > time.Minute*10)
			lctx                      = MakeSimpleContext("LogCompressor->retrieveFiles().(goroutine)")
			fAge                 time.Duration
			fp                   string
		)
		farr, err = os.ReadDir(lc.processedFolder)
		if err != nil {
			return
		}

		for _, f := range farr {
			select {
			case <-lc.ctxLC.Done():
				break
			default:
				if f.IsDir() {
					lc.TraceMsg(lctx, "file %s is a directory; skipping", fn)
					continue
				}

				fn = f.Name()
				ext = filepath.Ext(fn)
				fp = filepath.Join(lc.processedFolder, fn)

				if bRemoveOlder {
					if strings.EqualFold(ext, cext) { // limit the logic strictly to the compressed log files
						lstatInfo, err := f.Info() //os.Lstat(filepath.Join(lc.logFolder, f.Name()))
						if err == nil {
							if lstatInfo.Mode()&os.ModeSymlink == 0 { // do not attempt to remove symlinks
								if time.Since(lstatInfo.ModTime()) > lc.removeOlderThan {
									err = os.Remove(fp)
									if err != nil {
										lc.TraceMsg(lctx, "removing older file `%s`: encountered an error: %v", fp, err)
									} else {
										lc.TraceMsg(lctx, "success removing older file: %s", fp)
										continue
									}
								}
							} else { // do not attempt to remove symlinks
								lc.TraceMsg(lctx, "file `%s` is symlink; skipping", fn)
								continue
							}
						} else {
							lc.TraceMsg(lctx, "analysing file `%s` info encountered an error: %v", fn, err)
						}
					}
				}

				if bUseIgn {
					if b, err = filepath.Match(lc.patIgnore, fn); b || err != nil { // filename matches the ignore pattern
						lc.TraceMsg(lctx, "file `%s` was refused because of matching the ignore pattern", fn)
						continue
					}
				}

				if bUseAcc {
					if b, err = filepath.Match(lc.patAccept, fn); !b || err != nil { // filename does not match the accept pattern
						lc.TraceMsg(lctx, "file `%s` was refused because of not matching the accept pattern", fn)
						continue
					}
				}

				if bUseMap {
					if len(ext) < 1 { // disregard files with no extension
						lc.TraceMsg(lctx, "file `%s` extension could not be evaluated", fn)
						continue
					}
					if acc, b = lc.logExtFilter[ext]; !acc || !b { // do not process an archived log or unselectable files
						lc.TraceMsg(lctx, "file `%s` extension does not satisfy extension criteria", fn)
						continue
					}
				}

				// it matched by name or extension; is it a file or a symlink ?
				lstatInfo, err := f.Info() //os.Lstat(filepath.Join(lc.logFolder, f.Name()))
				if err != nil {
					lc.TraceMsg(lctx, "analysing file `%s` info encountered an error: %v", fn, err)
					continue
				}

				// we found a symlink
				if lstatInfo.Mode()&os.ModeSymlink != 0 {
					lc.TraceMsg(lctx, "file `%s` is symlink; skipping", fn)
					continue
				}

				// skip files recently modified (by default less than 1 min)
				if time.Since(lstatInfo.ModTime()) < lc.compressOlderThan {
					lc.TraceMsg(lctx, "file `%s` was modified recently; skipping", fn)
					continue
				}

				// evaluate small files - if they are not modified since a given time it means the service was restarted and they will not be touched again
				if lstatInfo.Size() < lc.minSizeToCompress {
					fAge = time.Since(lstatInfo.ModTime())
					if bCompressSmallAndOld && fAge > lc.compressSmallAndOlderThan {
						lc.TraceMsg(lctx, "small file `%s` of size `%d` and age `%v` will be compressed; it has reached the configured age threshold `%v`", fn, lstatInfo.Size(), fAge, lc.compressSmallAndOlderThan)
					} else {
						lc.TraceMsg(lctx, "file `%s` of size `%d` and age `%v` is candidate to compression, but has not reached the size threshold `%d`; skipping", fn, lstatInfo.Size(), fAge, lc.minSizeToCompress)
						continue
					}
				}

				// all previous conditions have been satisfied, so the file can be compressed
				lc.TraceMsg(lctx, "file `%s` was picked for compression processing", fn)
				iFound += 1
				dch <- filepath.Join(lc.processedFolder, fn)
			}
		}
	}(dataChan, cExt)

	return dataChan
}

softexpert avatar Jan 14 '25 06:01 softexpert