hdfs
hdfs copied to clipboard
Datanode failover is not supported for writes
What is the correct way to retry when errors such as EOF or broken pipe is encountered when writing? I saw the comment in the code that when a failure is encountered, the stream is not currently recoverable by itself? So should I Close this stream and invoke client.Append() to get a new one?
I saw many comments saying that the file write stream is not recoverable automatically. Will the fixing be scheduled in the near future?
Currently, failover during writes is sketched out, but not implemented. See this comment: https://github.com/colinmarc/hdfs/blob/cdda132fb253decd0fb23f1bb183bad5fd8c7fe4/rpc/block_writer.go#L53-L58
There's even a (skipped) test for it, here: https://github.com/colinmarc/hdfs/blob/cdda132fb253decd0fb23f1bb183bad5fd8c7fe4/rpc/block_writer_test.go#L78-L101
Unfortunately, the process is extremely client-heavy, so it's very complex to implement. I'll leave this issue open to track it.
To answer your question, you should indeed close the stream and start a new write.
I think this package is the 'native' hdfs client package for golang and it will be very nice if you can add the failover recovery part. For the retrying, do you mean to close the same file and use Append() to get a new stream or create another file? By the way, if the write fails, how can I ensure that the Close() will succeed? If I fails to Close() I'll lost the data not written to hdfs
Hi, @colinmarc . I took your advice and use reopening to retry, but got the following error constantly:
2017/08/25 03:26:00.320699 file_writer.go:108: [INFO] Close error:updateBlockForPipeline call failed with ERROR_APPLICATION (java.io.IOException) file:hdfs://rndhadoop001.rnd.fwmrm.net:8020/user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log
2017/08/25 03:26:00.321857 file_writer.go:114: [WARNING] Reopen error:append /user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log: append call failed with ERROR_APPLICATION (org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException) file:hdfs://rndhadoop001.rnd.fwmrm.net:8020/user/am/scan_task/2017-08-25/10.2.3.85_advanced_m/user-bak002-20170825032137.log
the retry logic:
for {
_, err = w.hdfsWriter.Write(b)
if nil == err {
break
}
retry := 0
lastSleepTime := 0
closeOpenRetry := 0
for nil != err && retry < MAX_WRITE_RETRY {
LogWarning(WARN, INTERNAL_LOGIC, "Error:%s retry:%s", err.Error(), w.name)
lastSleepTime = GetRetrySleepTime(BUSY_RETRY, lastSleepTime)
time.Sleep(time.Duration(lastSleepTime) * time.Second)
_, err = w.hdfsWriter.Write(b)
retry++
}
for nil != err && closeOpenRetry < MAX_CLOSE_OPEN_RETRY {
LogWarning(WARN, INTERNAL_LOGIC, "Error:%s closeopenretry:%s", err.Error(), w.name)
lastSleepTime = GetRetrySleepTime(BUSY_RETRY, lastSleepTime)
time.Sleep(time.Duration(lastSleepTime) * time.Second)
err = w.hdfsWriter.Close()
if nil != err {
LogInfo("Close error:%s file:%s", err.Error(), w.name)
}
tFn := strings.TrimPrefix(w.name, HDFS_FILE_PREFIX)
protocol := tFn[:strings.Index(tFn, "/")]
tmpWriter, err := GetHdfsClient(protocol).Append(w.name[len(HDFS_FILE_PREFIX)+len(protocol):])
if nil != err {
LogWarning(WARN, INTERNAL_LOGIC, "Reopen error:%s file:%s", err.Error(), w.name)
} else {
w.hdfsWriter = tmpWriter
}
}
}
the GetHdfsClient() will get hdfs.Client and w.hdfsWriter is hdfs.FileWriter
Can you help me on this? Thans a lot
@colinmarc : is this issue fixed and still kept open or its yet to be fixed ? Still getting this error though.
@colinmarc since this feature is not currently implemented, and seems relatively easy to implement the basic logic in my own application, I was hoping you could expand a little more on @kof02guy's question regarding closing the open file to retry.
To answer your question, you should indeed close the stream and start a new write.
Based on the BlockWriter comment, how would one "close the stream" in the most correct way? I would imagine calling Close on that current FileWriter would fail due to the same error that caused Write to fail.