Do not close tar stream before we are finished
Change-Id: I89db099e53c1f3eaf7bc2226e29e1578186e1f6b
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index 24fa849..3c80c69 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -424,6 +424,9 @@
// Flag to track if TAR stream is still open
@Volatile var tarStreamOpen = true
+ // Lock to synchronize TAR stream access between scanner and main thread
+ private val tarStreamLock = java.util.concurrent.locks.ReentrantLock()
+
fun String.hasCorrespondingBaseZip(): Boolean {
if (!this.matches(Regex(".*\\.([^/.]+)\\.zip$"))) return false
val baseZip = this.replace(Regex("\\.([^/.]+)\\.zip$"), ".zip")
@@ -737,11 +740,17 @@
incrementalProgressBar?.step()
}
- // Set flag before closing stream to prevent scanner from trying to write
- tarStreamOpen = false
+ // Acquire lock before closing stream to prevent concurrent scanner access
+ tarStreamLock.lock()
+ try {
+ // Set flag before closing stream to prevent scanner from trying to write
+ tarStreamOpen = false
- krillTarOutputStream!!.finish()
- krillTarOutputStream!!.close()
+ krillTarOutputStream!!.finish()
+ krillTarOutputStream!!.close()
+ } finally {
+ tarStreamLock.unlock()
+ }
// Close incremental progress bar if it was initialized
incrementalProgressBar?.close()
@@ -2979,19 +2988,25 @@
// We successfully claimed it - output now
val textData = krillData.remove(textId)
if (textData != null) {
- // Double-check stream is still open before writing
- if (tarStreamOpen) {
- try {
- outputKrillText(textId, textData)
- incrementalProgressBar?.step()
- outputCount++
- LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
- } catch (e: IOException) {
- // Stream may have been closed - stop trying to output
- LOGGER.warning("Cannot output text $textId: stream closed")
- tarStreamOpen = false
- break
+ // Acquire lock to prevent concurrent access with stream closing
+ tarStreamLock.lock()
+ try {
+ // Double-check stream is still open while holding the lock
+ if (tarStreamOpen) {
+ try {
+ outputKrillText(textId, textData)
+ incrementalProgressBar?.step()
+ outputCount++
+ LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} still pending)")
+ } catch (e: IOException) {
+ // Stream may have been closed - stop trying to output
+ LOGGER.warning("Cannot output text $textId: stream closed")
+ tarStreamOpen = false
+ break
+ }
}
+ } finally {
+ tarStreamLock.unlock()
}
}
@@ -3013,21 +3028,37 @@
private fun stopIncrementalWriterThread() {
if (incrementalOutputScheduler != null) {
LOGGER.info("Stopping incremental writer scheduler")
+
+ // Set shutdown flag first to prevent scanner from starting new work
shutdownIncrementalWriter = true
- // Do one final scan before shutting down
- scanAndOutputCompleteTexts()
+ // Use shutdownNow() to cancel any pending scheduled tasks immediately
+ val cancelledTasks = incrementalOutputScheduler?.shutdownNow()
+ LOGGER.fine("Cancelled ${cancelledTasks?.size ?: 0} pending scheduled tasks")
- // Use shutdownNow() to cancel any scheduled tasks immediately
- incrementalOutputScheduler?.shutdownNow()
+ // Wait for currently executing tasks to finish (increased timeout for safety)
try {
- if (!incrementalOutputScheduler!!.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
- LOGGER.warning("Writer scheduler did not terminate within timeout")
+ var waitTime = 0L
+ while (!incrementalOutputScheduler!!.awaitTermination(1, java.util.concurrent.TimeUnit.SECONDS)) {
+ waitTime++
+ if (waitTime >= 10) {
+ LOGGER.warning("Writer scheduler did not terminate after ${waitTime}s, giving up")
+ break
+ }
+ LOGGER.fine("Waiting for writer scheduler to terminate... (${waitTime}s)")
+ }
+ if (waitTime > 0) {
+ LOGGER.info("Writer scheduler terminated after ${waitTime}s")
}
} catch (e: InterruptedException) {
LOGGER.warning("Interrupted while stopping writer scheduler")
Thread.currentThread().interrupt()
}
+
+ // Do one final scan after scheduler has stopped completely
+ LOGGER.info("Doing final scan for remaining texts...")
+ scanAndOutputCompleteTexts()
+
incrementalOutputScheduler = null
}
}