Log stalled pending Krill texts
Change-Id: I8dbba5ad96e6b2592eae291bca4051fe844986d0
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a90213c..5403f5b 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1183,6 +1183,7 @@
val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
+ val krillCompressionStartNanos: ConcurrentHashMap<String, Long> = ConcurrentHashMap()
val readyKrillTextIds: java.util.concurrent.BlockingQueue<String> = java.util.concurrent.LinkedBlockingQueue()
val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
@@ -1195,6 +1196,10 @@
private val krillPeakReadyQueueDepth = AtomicInteger(0)
@Volatile private var lastKrillStateLogNanos = 0L
private val krillStateLogIntervalNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30)
+ @Volatile private var lastKrillOutputProgressNanos = 0L
+ @Volatile private var lastKrillPendingDetailLogNanos = 0L
+ private val krillPendingDetailLogIntervalNanos = java.util.concurrent.TimeUnit.MINUTES.toNanos(2)
+ private val krillNoProgressWarnNanos = java.util.concurrent.TimeUnit.MINUTES.toNanos(5)
// Compression thread pool for parallel GZIP compression
var compressionExecutor: java.util.concurrent.ExecutorService? = null
@@ -1251,7 +1256,12 @@
krillPeakCompressionInFlight.set(0)
krillPeakReadyQueueDepth.set(0)
lastKrillStateLogNanos = 0L
+ lastKrillOutputProgressNanos = System.nanoTime()
+ lastKrillPendingDetailLogNanos = 0L
krillData.clear()
+ krillCompressedData.clear()
+ krillCompressionFutures.clear()
+ krillCompressionStartNanos.clear()
corpusMetadata.clear()
docMetadata.clear()
zipInventory.clear()
@@ -3453,6 +3463,65 @@
"inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}" +
compressionPoolStats
)
+
+ val noProgressFor = if (lastKrillOutputProgressNanos == 0L) 0L else now - lastKrillOutputProgressNanos
+ val shouldLogPendingDetails = rawPending > 0 && (
+ force || (
+ rawPending <= 4 &&
+ noProgressFor >= krillNoProgressWarnNanos &&
+ now - lastKrillPendingDetailLogNanos >= krillPendingDetailLogIntervalNanos
+ )
+ )
+ if (shouldLogPendingDetails) {
+ lastKrillPendingDetailLogNanos = now
+ logPendingKrillTexts(now, noProgressFor)
+ }
+ }
+
+ private fun logPendingKrillTexts(nowNanos: Long, noProgressNanos: Long) {
+ val pendingTextIds = krillData.keys
+ .filterNot { outputTexts.contains(it) }
+ .sortedWith(this::compareTextIds)
+ if (pendingTextIds.isEmpty()) return
+
+ val foundryTasksRemaining = foundryTaskCounts.values.sumOf { it.get() }
+ LOGGER.info(
+ "KRILL-PENDING count=${pendingTextIds.size} noProgress=${formatDuration(noProgressNanos / 1_000_000_000L)} " +
+ "foundryTasksRemaining=$foundryTasksRemaining ids=${pendingTextIds.joinToString(",")}"
+ )
+
+ pendingTextIds.take(6).forEach { textId ->
+ val future = krillCompressionFutures[textId]
+ val expectedFoundries = expectedFoundriesPerText[textId]
+ val completedFoundries = textFoundryCompletion[textId]?.toSortedSet() ?: sortedSetOf()
+ val missingFoundries = expectedFoundries?.minus(completedFoundries) ?: emptySet()
+ val legacyRelevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys.sorted()
+ val processedZipCount = legacyRelevantZips.count { path ->
+ processedTextsPerZip[path]?.contains(textId) == true
+ }
+ val compressionAgeSeconds = krillCompressionStartNanos[textId]?.let { started ->
+ (nowNanos - started).coerceAtLeast(0L) / 1_000_000_000L
+ }
+ val flowState = if (expectedFoundries != null) {
+ "expected=${expectedFoundries.size},completed=${completedFoundries.size},missing=${missingFoundries.sorted()}"
+ } else {
+ "legacyZips=$processedZipCount/${legacyRelevantZips.size}"
+ }
+ LOGGER.info(
+ "KRILL-PENDING text=$textId state{" +
+ "raw=${krillData.containsKey(textId)}," +
+ "compressed=${krillCompressedData.containsKey(textId)}," +
+ "future=${future != null}," +
+ "futureDone=${future?.isDone ?: false}," +
+ "futureCancelled=${future?.isCancelled ?: false}," +
+ "compressionAge=${compressionAgeSeconds?.let(::formatDuration) ?: "-"}," +
+ "$flowState}"
+ )
+ }
+ }
+
+ private fun noteKrillOutputProgress() {
+ lastKrillOutputProgressNanos = System.nanoTime()
}
private fun conlluOutput(foundry: String, docId: String): StringBuilder {
@@ -5535,6 +5604,7 @@
e.printStackTrace()
} finally {
krillCompressionFutures.remove(textId)
+ krillCompressionStartNanos.remove(textId)
}
}
@@ -5556,6 +5626,7 @@
if (existing != null) {
future.cancel(false)
} else {
+ krillCompressionStartNanos[textId] = System.nanoTime()
updatePeakCounter(krillPeakRawPending, krillData.size)
updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
}
@@ -5609,6 +5680,7 @@
}
krillOutputCount.incrementAndGet()
+ noteKrillOutputProgress()
incrementalProgressBar?.step()
outputCount++
LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} raw pending)")
@@ -5626,6 +5698,7 @@
krillData.remove(textId)
krillCompressedData.remove(textId)
krillCompressionFutures.remove(textId)
+ krillCompressionStartNanos.remove(textId)
val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
relevantZips.forEach { path ->
@@ -5879,6 +5952,7 @@
}
val count = krillOutputCount.incrementAndGet()
+ noteKrillOutputProgress()
LOGGER.fine("Output Krill JSON for $textId ($count total)")
// Free memory: remove text data from all data structures after successful output