File rotation limitations
Hello phuslu,
You have added a very nice set of features in the library; there are a few limitations related to file rotation that give me currently some trouble:
-
the rotation logic assumes that only gzip is used. However, since you exposed an important part of the logic in the Cleaner attribute, you should expose also the filtering of the files so that different compression formats can be used. I could, for example, compress as ZSTD, or 7Z, or LZ4 and I would need the corresponding extensions to be evaluated by the Rotate method.
-
the rotation logic assumes that the code will be executed on Linux - and it has some specific logic (for example retrieving the user and group IDs). But the same is needed in Windows context, and there should be a Windows specific logic as well.
Perhaps the best would be to declare a default rotation handler per platform and also expose some variables in the called method:
- the filter to be used for the files names
- the enumeration of the files in the log folder
I hope my explanations did not create confusion.
Do you think it is complicated to implement ?
Thank you in advance for your feedback !
compress as ZSTD, or 7Z, or LZ4 and I would need the corresponding extensions to be evaluated by the Rotate method.
Agree, let me try fix that limitation. (described below)
for example retrieving the user and group IDs
Currently I think it is a bonus for "sudo ./program" running. I don't think we need add similar things to other platform(i.e. windows)
the filter to be used for the files names
First I'd like to add/expose a field in FileWriter (maybe) named CompressSuffixs []string, uses []string{".gz", ".bz2", ".zstd", "lz4"} as default. Let me implement this way first, because I think exposing a value field instead of a callback/function field, makes users feel calm
the enumeration of the files in the log folder
I think it maybe go a bit far. In this case, people just copy file a new file.go then do they self modification. It maybe lost the "AsyncWriter" ability, I will fix it issue first.
Overall, thanks for you point this, it let me recognize that the filewriter of this lib still need to improve.
Thank you for the quick response, phuslu! Since there are major difference in how compression commands are available (sometimes you need to declare specific executable paths in Windows) and how the rights are handled, it definitely requires conditional compilation and platform-specific code. Another aspect is that, no matter how rich is the logic implemented, there might be use cases that cannot benefit from it (what if I want to rotate at 10:00 AM the log and push it as LZ4 to a folder and rotate it again at 17:30 and push it as 7Z on another folder ?). It would probably be best to have a callback option as well, besides the standard method, so that the users can implement their own logic.
It definitely requires some reflection, since it can become quite complex ... On my side I ended up making a helper that I call based on a scheduler, for now.
If it helps, here is the core of my log compression approach; utils.SuspendableTicker is just a fancy ticker that can be paused and restarted.
package logging
import (
"FA/scanxp/utils"
"context"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"time"
)
type LogCompressor struct {
cType CompressorType
cPath string // relative or absolute path of the compression app
cOK bool // flag telling that compressor can work
processedFolder string // folder that will be inspected
logExtFilter map[string]bool // extensions to be accepted / disregarded
patAccept string //pattern to accept files
patIgnore string //pattern to ignore files
compressOlderThan time.Duration // defaults to 1 min; recently modified files could still be written to so it is best to skip them
removeOlderThan time.Duration // defaults to 10 days; compressed files older than this will be deleted
compressSmallAndOlderThan time.Duration // defaults to 15 min; recently modified files could still be written to so it is best to skip them
minSizeToCompress int64
ctxLC context.Context
cancelF context.CancelFunc
sched *utils.SuspendableTicker
}
type CompressorType uint
// also take a look at https://github.com/mholt/archives for decompression
const (
CmpLZ4 CompressorType = iota // in Windows needs https://github.com/lz4/lz4/releases
Cmp7Z // in Windows needs https://www.7-zip.org/ or https://github.com/mcmilk/7-Zip-zstd/releases
CmpZSTD // in Windows needs https://github.com/facebook/zstd/releases
CmpGZ // in Windows needs https://sourceforge.net/projects/gnuwin32/files/gzip/
)
var (
cmp4Logs *LogCompressor
onceInitCompressor sync.Once
)
func GetCompressor(cType CompressorType, useRelPath bool) *LogCompressor {
onceInitCompressor.Do(func() {
initCompressor(cType, useRelPath)
})
return cmp4Logs
}
func (lc *LogCompressor) WithContext(ctx context.Context) *LogCompressor {
lc.ctxLC = ctx
return lc
}
func (lc *LogCompressor) WithContextAndCancel(ctx context.Context, cF context.CancelFunc) *LogCompressor {
lc.ctxLC = ctx
lc.cancelF = cF
return lc
}
func (lc *LogCompressor) WithLogFolder(fldr string) *LogCompressor {
lc.processedFolder = fldr
return lc
}
func (lc *LogCompressor) WithAcceptableLogFilters(fltrs ...string) *LogCompressor {
if lc.logExtFilter == nil {
lc.logExtFilter = map[string]bool{}
}
for _, v := range fltrs {
lc.logExtFilter[v] = true
}
return lc
}
func (lc *LogCompressor) WithUnacceptableLogFilters(fltrs ...string) *LogCompressor {
if lc.logExtFilter == nil {
lc.logExtFilter = map[string]bool{}
}
for _, v := range fltrs {
lc.logExtFilter[v] = false
}
return lc
}
func (lc *LogCompressor) WithPatternFilters(accept string, ignore string) *LogCompressor {
lc.patAccept = accept
lc.patIgnore = ignore
return lc
}
// WithDurations
func (lc *LogCompressor) WithDurations(minAge4Compress time.Duration, maxAge4Keep time.Duration, minAgeForceCompress time.Duration) *LogCompressor {
lc.compressOlderThan = minAge4Compress
lc.removeOlderThan = maxAge4Keep
if minAgeForceCompress > 0 && minAgeForceCompress < time.Minute*15 {
lc.compressSmallAndOlderThan = time.Minute * 15
} else {
lc.compressSmallAndOlderThan = minAgeForceCompress
}
return lc
}
func (lc *LogCompressor) WithSizeCriteria(minSize int64) *LogCompressor {
lc.minSizeToCompress = minSize
return lc
}
func (lc *LogCompressor) DisableScheduler() *LogCompressor {
lc.cancelF()
lc.sched.Stop()
return lc
}
func (lc *LogCompressor) EnableScheduler(freq time.Duration) *LogCompressor {
// cannot accept less than 5 min between cycles
if freq < (time.Minute * 5) {
freq = time.Minute * 5
}
lc.sched = utils.NewSuspendableTicker(lc.ctxLC, freq, true)
go func() {
for {
select {
case <-lc.ctxLC.Done():
lc.sched.Stop()
if lc.cancelF != nil { // signals that the processing needs to stop
lc.cancelF()
}
return
case _ = <-lc.sched.TC:
lc.sched.Suspend()
lc.ProcessLogFiles(1)
time.Sleep(time.Second * 10) // give it time to settle
lc.sched.Resume()
}
}
}()
lc.sched.Resume()
return lc
}
func initCompressor(cType CompressorType, relPath bool) {
var (
exePath string
err error
)
cmp4Logs = &LogCompressor{
cType: cType,
cOK: true,
compressOlderThan: time.Minute, // by default 1 minute; files younger than this will be left to settle
removeOlderThan: time.Hour * 240, // by default, compressed log files older than 10 days are removed
}
switch cType {
case CmpLZ4:
cmp4Logs.cPath = "lz4"
case Cmp7Z:
cmp4Logs.cPath = "7z"
case CmpZSTD:
cmp4Logs.cPath = "zstd"
case CmpGZ:
cmp4Logs.cPath = "gzip"
}
if !relPath {
exePath, err = os.Executable()
cmp4Logs.cPath, err = filepath.Abs(filepath.Join(filepath.Dir(exePath), cmp4Logs.cPath))
if err != nil {
cmp4Logs.cOK = false
}
}
}
func (c LogCompressor) Compress(fn string, keepOldFile bool) (err error) {
if c.cOK {
var wg sync.WaitGroup = sync.WaitGroup{}
wg.Add(1)
switch c.cType {
case CmpLZ4:
if keepOldFile {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-12", "-f", fn, fn+`.lz4`).Run()
}()
} else {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-12", "-f", "--rm", fn, fn+`.lz4`).Run()
}()
}
case Cmp7Z:
if keepOldFile {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "a", fn+`.7z`, fn).Run()
}()
} else {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "a", "-sdel", fn+`.7z`, fn).Run()
}()
}
case CmpZSTD:
if keepOldFile {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-12", "-k", "-f", fn, "-o", fn+`.zstd`).Run()
}()
} else {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-12", "--rm", "-f", fn, "-o", fn+`.zstd`).Run()
}()
}
case CmpGZ:
if keepOldFile {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-9", "-k", "-f", fn).Run()
}()
} else {
go func() {
defer wg.Done()
err = exec.Command(c.cPath, "-9", "-f", fn).Run()
}()
}
}
wg.Wait()
}
return err
}
// https://towardsdev.com/golang-producer-consumer-from-naive-to-almost-production-ready-solution-fc7be66ad71c
func (lc *LogCompressor) ProcessLogFiles(nbWorkers int) {
var (
cExt string
dataChan chan string
)
switch lc.cType {
case CmpLZ4:
cExt = ".lz4"
case Cmp7Z:
cExt = ".7z"
case CmpZSTD:
cExt = ".zstd"
case CmpGZ:
cExt = ".gz"
}
lc.logExtFilter[cExt] = false // by default, exclude the compressor's corresponding extension - we do not want to compress again the same files
consumersClosedChan := make(chan struct{})
dataChan = lc.retrieveFiles(nbWorkers, cExt)
go lc.runConsumers(nbWorkers, dataChan, consumersClosedChan)
<-consumersClosedChan
}
func (lc LogCompressor) runConsumers(NumOfConsumers int, dataChan chan string, consumersClosedChan chan struct{}) {
defer close(consumersClosedChan)
var wg sync.WaitGroup
wg.Add(NumOfConsumers)
for i := 0; i < NumOfConsumers; i++ {
go func() {
for {
select {
case <-lc.ctxLC.Done():
return
case fn, ok := <-dataChan:
if !ok {
// ending consumer
wg.Done()
return
}
lc.Compress(fn, false)
}
}
}()
}
wg.Wait()
}
// retrieveFiles produces a list of files to be compressed
func (lc LogCompressor) retrieveFiles(nbWorkers int, cExt string) (dataChan chan string) {
dataChan = make(chan string, nbWorkers)
go func(dch chan string, cext string) {
defer close(dch)
var (
ext string
iFound int
b bool
acc bool
fn string
bUseAcc bool = (len(lc.patAccept) > 0) // use accept pattern
bUseIgn bool = (len(lc.patIgnore) > 0) // use ignore pattern
bUseMap bool = (len(lc.logExtFilter) > 0) // use extensions collection map
farr []os.DirEntry
err error
bRemoveOlder bool = (lc.removeOlderThan > time.Hour) // enable file removing if interval is set to at least one hour
bCompressSmallAndOld bool = (lc.compressSmallAndOlderThan > time.Minute*10)
lctx = MakeSimpleContext("LogCompressor->retrieveFiles().(goroutine)")
fAge time.Duration
fp string
)
farr, err = os.ReadDir(lc.processedFolder)
if err != nil {
return
}
for _, f := range farr {
select {
case <-lc.ctxLC.Done():
break
default:
if f.IsDir() {
lc.TraceMsg(lctx, "file %s is a directory; skipping", fn)
continue
}
fn = f.Name()
ext = filepath.Ext(fn)
fp = filepath.Join(lc.processedFolder, fn)
if bRemoveOlder {
if strings.EqualFold(ext, cext) { // limit the logic strictly to the compressed log files
lstatInfo, err := f.Info() //os.Lstat(filepath.Join(lc.logFolder, f.Name()))
if err == nil {
if lstatInfo.Mode()&os.ModeSymlink == 0 { // do not attempt to remove symlinks
if time.Since(lstatInfo.ModTime()) > lc.removeOlderThan {
err = os.Remove(fp)
if err != nil {
lc.TraceMsg(lctx, "removing older file `%s`: encountered an error: %v", fp, err)
} else {
lc.TraceMsg(lctx, "success removing older file: %s", fp)
continue
}
}
} else { // do not attempt to remove symlinks
lc.TraceMsg(lctx, "file `%s` is symlink; skipping", fn)
continue
}
} else {
lc.TraceMsg(lctx, "analysing file `%s` info encountered an error: %v", fn, err)
}
}
}
if bUseIgn {
if b, err = filepath.Match(lc.patIgnore, fn); b || err != nil { // filename matches the ignore pattern
lc.TraceMsg(lctx, "file `%s` was refused because of matching the ignore pattern", fn)
continue
}
}
if bUseAcc {
if b, err = filepath.Match(lc.patAccept, fn); !b || err != nil { // filename does not match the accept pattern
lc.TraceMsg(lctx, "file `%s` was refused because of not matching the accept pattern", fn)
continue
}
}
if bUseMap {
if len(ext) < 1 { // disregard files with no extension
lc.TraceMsg(lctx, "file `%s` extension could not be evaluated", fn)
continue
}
if acc, b = lc.logExtFilter[ext]; !acc || !b { // do not process an archived log or unselectable files
lc.TraceMsg(lctx, "file `%s` extension does not satisfy extension criteria", fn)
continue
}
}
// it matched by name or extension; is it a file or a symlink ?
lstatInfo, err := f.Info() //os.Lstat(filepath.Join(lc.logFolder, f.Name()))
if err != nil {
lc.TraceMsg(lctx, "analysing file `%s` info encountered an error: %v", fn, err)
continue
}
// we found a symlink
if lstatInfo.Mode()&os.ModeSymlink != 0 {
lc.TraceMsg(lctx, "file `%s` is symlink; skipping", fn)
continue
}
// skip files recently modified (by default less than 1 min)
if time.Since(lstatInfo.ModTime()) < lc.compressOlderThan {
lc.TraceMsg(lctx, "file `%s` was modified recently; skipping", fn)
continue
}
// evaluate small files - if they are not modified since a given time it means the service was restarted and they will not be touched again
if lstatInfo.Size() < lc.minSizeToCompress {
fAge = time.Since(lstatInfo.ModTime())
if bCompressSmallAndOld && fAge > lc.compressSmallAndOlderThan {
lc.TraceMsg(lctx, "small file `%s` of size `%d` and age `%v` will be compressed; it has reached the configured age threshold `%v`", fn, lstatInfo.Size(), fAge, lc.compressSmallAndOlderThan)
} else {
lc.TraceMsg(lctx, "file `%s` of size `%d` and age `%v` is candidate to compression, but has not reached the size threshold `%d`; skipping", fn, lstatInfo.Size(), fAge, lc.minSizeToCompress)
continue
}
}
// all previous conditions have been satisfied, so the file can be compressed
lc.TraceMsg(lctx, "file `%s` was picked for compression processing", fn)
iFound += 1
dch <- filepath.Join(lc.processedFolder, fn)
}
}
}(dataChan, cExt)
return dataChan
}