Log stalled pending Krill texts

Change-Id: I8dbba5ad96e6b2592eae291bca4051fe844986d0
diff --git a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
index a90213c..5403f5b 100644
--- a/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
+++ b/app/src/main/kotlin/de/ids_mannheim/korapxmltools/KorapXmlTool.kt
@@ -1183,6 +1183,7 @@
     val krillData: ConcurrentHashMap<String, KrillJsonGenerator.KrillTextData> = ConcurrentHashMap()
     val krillCompressedData: ConcurrentHashMap<String, CompressedKrillData> = ConcurrentHashMap()
     val krillCompressionFutures: ConcurrentHashMap<String, java.util.concurrent.Future<*>> = ConcurrentHashMap()
+    val krillCompressionStartNanos: ConcurrentHashMap<String, Long> = ConcurrentHashMap()
     val readyKrillTextIds: java.util.concurrent.BlockingQueue<String> = java.util.concurrent.LinkedBlockingQueue()
     val corpusMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
     val docMetadata: ConcurrentHashMap<String, MutableMap<String, Any>> = ConcurrentHashMap()
@@ -1195,6 +1196,10 @@
     private val krillPeakReadyQueueDepth = AtomicInteger(0)
     @Volatile private var lastKrillStateLogNanos = 0L
     private val krillStateLogIntervalNanos = java.util.concurrent.TimeUnit.SECONDS.toNanos(30)
+    @Volatile private var lastKrillOutputProgressNanos = 0L
+    @Volatile private var lastKrillPendingDetailLogNanos = 0L
+    private val krillPendingDetailLogIntervalNanos = java.util.concurrent.TimeUnit.MINUTES.toNanos(2)
+    private val krillNoProgressWarnNanos = java.util.concurrent.TimeUnit.MINUTES.toNanos(5)
 
     // Compression thread pool for parallel GZIP compression
     var compressionExecutor: java.util.concurrent.ExecutorService? = null
@@ -1251,7 +1256,12 @@
             krillPeakCompressionInFlight.set(0)
             krillPeakReadyQueueDepth.set(0)
             lastKrillStateLogNanos = 0L
+            lastKrillOutputProgressNanos = System.nanoTime()
+            lastKrillPendingDetailLogNanos = 0L
             krillData.clear()
+            krillCompressedData.clear()
+            krillCompressionFutures.clear()
+            krillCompressionStartNanos.clear()
             corpusMetadata.clear()
             docMetadata.clear()
             zipInventory.clear()
@@ -3453,6 +3463,65 @@
                 "inFlight=${krillPeakCompressionInFlight.get()},ready=${krillPeakReadyQueueDepth.get()}}" +
                 compressionPoolStats
         )
+
+        val noProgressFor = if (lastKrillOutputProgressNanos == 0L) 0L else now - lastKrillOutputProgressNanos
+        val shouldLogPendingDetails = rawPending > 0 && (
+            force || (
+                rawPending <= 4 &&
+                    noProgressFor >= krillNoProgressWarnNanos &&
+                    now - lastKrillPendingDetailLogNanos >= krillPendingDetailLogIntervalNanos
+                )
+            )
+        if (shouldLogPendingDetails) {
+            lastKrillPendingDetailLogNanos = now
+            logPendingKrillTexts(now, noProgressFor)
+        }
+    }
+
+    private fun logPendingKrillTexts(nowNanos: Long, noProgressNanos: Long) {
+        val pendingTextIds = krillData.keys
+            .filterNot { outputTexts.contains(it) }
+            .sortedWith(this::compareTextIds)
+        if (pendingTextIds.isEmpty()) return
+
+        val foundryTasksRemaining = foundryTaskCounts.values.sumOf { it.get() }
+        LOGGER.info(
+            "KRILL-PENDING count=${pendingTextIds.size} noProgress=${formatDuration(noProgressNanos / 1_000_000_000L)} " +
+                "foundryTasksRemaining=$foundryTasksRemaining ids=${pendingTextIds.joinToString(",")}"
+        )
+
+        pendingTextIds.take(6).forEach { textId ->
+            val future = krillCompressionFutures[textId]
+            val expectedFoundries = expectedFoundriesPerText[textId]
+            val completedFoundries = textFoundryCompletion[textId]?.toSortedSet() ?: sortedSetOf()
+            val missingFoundries = expectedFoundries?.minus(completedFoundries) ?: emptySet()
+            val legacyRelevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys.sorted()
+            val processedZipCount = legacyRelevantZips.count { path ->
+                processedTextsPerZip[path]?.contains(textId) == true
+            }
+            val compressionAgeSeconds = krillCompressionStartNanos[textId]?.let { started ->
+                (nowNanos - started).coerceAtLeast(0L) / 1_000_000_000L
+            }
+            val flowState = if (expectedFoundries != null) {
+                "expected=${expectedFoundries.size},completed=${completedFoundries.size},missing=${missingFoundries.sorted()}"
+            } else {
+                "legacyZips=$processedZipCount/${legacyRelevantZips.size}"
+            }
+            LOGGER.info(
+                "KRILL-PENDING text=$textId state{" +
+                    "raw=${krillData.containsKey(textId)}," +
+                    "compressed=${krillCompressedData.containsKey(textId)}," +
+                    "future=${future != null}," +
+                    "futureDone=${future?.isDone ?: false}," +
+                    "futureCancelled=${future?.isCancelled ?: false}," +
+                    "compressionAge=${compressionAgeSeconds?.let(::formatDuration) ?: "-"}," +
+                    "$flowState}"
+            )
+        }
+    }
+
+    private fun noteKrillOutputProgress() {
+        lastKrillOutputProgressNanos = System.nanoTime()
     }
 
     private fun conlluOutput(foundry: String, docId: String): StringBuilder {
@@ -5535,6 +5604,7 @@
             e.printStackTrace()
         } finally {
             krillCompressionFutures.remove(textId)
+            krillCompressionStartNanos.remove(textId)
         }
     }
 
@@ -5556,6 +5626,7 @@
         if (existing != null) {
             future.cancel(false)
         } else {
+            krillCompressionStartNanos[textId] = System.nanoTime()
             updatePeakCounter(krillPeakRawPending, krillData.size)
             updatePeakCounter(krillPeakCompressionInFlight, krillCompressionFutures.size)
         }
@@ -5609,6 +5680,7 @@
                             }
 
                             krillOutputCount.incrementAndGet()
+                            noteKrillOutputProgress()
                             incrementalProgressBar?.step()
                             outputCount++
                             LOGGER.fine("Output text $textId (processed by ${relevantZips.size} ZIPs, ${krillData.size} raw pending)")
@@ -5626,6 +5698,7 @@
             krillData.remove(textId)
             krillCompressedData.remove(textId)
             krillCompressionFutures.remove(textId)
+            krillCompressionStartNanos.remove(textId)
 
             val relevantZips = zipInventory.filter { (_, texts) -> texts.contains(textId) }.keys
             relevantZips.forEach { path ->
@@ -5879,6 +5952,7 @@
             }
 
             val count = krillOutputCount.incrementAndGet()
+            noteKrillOutputProgress()
             LOGGER.fine("Output Krill JSON for $textId ($count total)")
 
             // Free memory: remove text data from all data structures after successful output