Unordered distance spans for tokens
diff --git a/src/main/java/de/ids_mannheim/korap/query/spans/CandidateSpan.java b/src/main/java/de/ids_mannheim/korap/query/spans/CandidateSpan.java
index 1db18ef..d8727f2 100644
--- a/src/main/java/de/ids_mannheim/korap/query/spans/CandidateSpan.java
+++ b/src/main/java/de/ids_mannheim/korap/query/spans/CandidateSpan.java
@@ -33,6 +33,15 @@
 		this.position = position;		
 	}
 	
+	public CandidateSpan(int start, int end, int doc, long cost,
+			Collection<byte[]> payloads) {
+		this.start = start;
+		this.end = end;
+		this.doc = doc;
+		this.cost = cost;
+		this.payloads = payloads;
+	}
+
 	public int getDoc() {
 		return doc;
 	}
diff --git a/src/main/java/de/ids_mannheim/korap/query/spans/DistanceSpan.java b/src/main/java/de/ids_mannheim/korap/query/spans/DistanceSpan.java
index 2f48020..ca643a9 100644
--- a/src/main/java/de/ids_mannheim/korap/query/spans/DistanceSpan.java
+++ b/src/main/java/de/ids_mannheim/korap/query/spans/DistanceSpan.java
@@ -16,7 +16,8 @@
 
 /** DistanceSpan is a base class for enumeration of span matches, 
  * 	whose two child spans have a specific range of distance (within 
- * 	a min and a max distance). 
+ * 	a min and a max distance) and must be in order (a firstspan is 
+ * 	followed by a secondspan). 
  * 
  * @author margaretha
  * */
diff --git a/src/main/java/de/ids_mannheim/korap/query/spans/ElementDistanceSpan.java b/src/main/java/de/ids_mannheim/korap/query/spans/ElementDistanceSpan.java
index d51a70d..389e989 100644
--- a/src/main/java/de/ids_mannheim/korap/query/spans/ElementDistanceSpan.java
+++ b/src/main/java/de/ids_mannheim/korap/query/spans/ElementDistanceSpan.java
@@ -18,7 +18,7 @@
  * 	where the child spans are. The element-distance unit can be a sentence 
  * 	or a paragraph. All other child spans occurrence which are not in 
  * 	a sentence or a paragraph (with respect to the element distance type 
- * 	current used) are ignored.
+ * 	current used), are ignored.
  * 
  * @author margaretha
  * */
diff --git a/src/main/java/de/ids_mannheim/korap/query/spans/SimpleSpans.java b/src/main/java/de/ids_mannheim/korap/query/spans/SimpleSpans.java
index 189ee8c..0b7b08c 100644
--- a/src/main/java/de/ids_mannheim/korap/query/spans/SimpleSpans.java
+++ b/src/main/java/de/ids_mannheim/korap/query/spans/SimpleSpans.java
@@ -15,7 +15,7 @@
 import de.ids_mannheim.korap.query.SimpleSpanQuery;
 
 /** An abstract class for Span enumeration including span match properties
- * 	and basic methods;
+ * 	and basic methods.
  *  
  * 	@author margaretha
  * 
diff --git a/src/main/java/de/ids_mannheim/korap/query/spans/UnorderedDistanceSpans.java b/src/main/java/de/ids_mannheim/korap/query/spans/UnorderedDistanceSpans.java
index 9bc0d78..018ce1d 100644
--- a/src/main/java/de/ids_mannheim/korap/query/spans/UnorderedDistanceSpans.java
+++ b/src/main/java/de/ids_mannheim/korap/query/spans/UnorderedDistanceSpans.java
@@ -1,39 +1,251 @@
 package de.ids_mannheim.korap.query.spans;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.lucene.index.AtomicReaderContext;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.index.TermContext;
+import org.apache.lucene.search.spans.Spans;
 import org.apache.lucene.util.Bits;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import de.ids_mannheim.korap.query.SpanDistanceQuery;
 
+/** Enumeration of span matches, whose two child spans have a specific 
+ * 	range of distance (within a min and a max distance) and can be in 
+ * 	any order. 
+ * @author margaretha
+ * */
 public class UnorderedDistanceSpans extends SimpleSpans{
 
+	private int minDistance, maxDistance;
+	private boolean collectPayloads;	
+	
+	private boolean hasMoreFirstSpans, hasMoreSecondSpans;
+	private List<CandidateSpan> firstSpanList, secondSpanList;
+	
+	private List<CandidateSpan> matchList;
+	private long matchCost;
+	
+	private Logger log = LoggerFactory.getLogger(UnorderedDistanceSpans.class);
+	
 	public UnorderedDistanceSpans(SpanDistanceQuery query,
 			AtomicReaderContext context, Bits acceptDocs,
 			Map<Term, TermContext> termContexts) throws IOException {
 		super(query, context, acceptDocs, termContexts);
+		minDistance = query.getMinDistance();
+		maxDistance =  query.getMaxDistance();
+		collectPayloads = query.isCollectPayloads();
+		
+		firstSpanList = new ArrayList<CandidateSpan>();
+		secondSpanList = new ArrayList<CandidateSpan>();
+		matchList = new ArrayList<CandidateSpan>();
+		
+		hasMoreFirstSpans = firstSpans.next();
+		hasMoreSecondSpans = secondSpans.next();
+		hasMoreSpans = hasMoreFirstSpans && hasMoreSecondSpans;
 	}
 
 	@Override
 	public boolean next() throws IOException {
-		// TODO Auto-generated method stub
+		isStartEnumeration = false;
+		matchPayload.clear();
+		return advance();
+	}
+	
+	/** Find the next span match.
+	 * @return true iff a span match is available.
+	 * */
+	private boolean advance() throws IOException {
+		while (hasMoreSpans || !matchList.isEmpty()){			
+			if (!matchList.isEmpty()){
+				setMatch();
+				return true;
+			}
+			
+			if (firstSpanList.isEmpty() && secondSpanList.isEmpty()){
+				
+				if (hasMoreFirstSpans && hasMoreSecondSpans &&
+						ensureSameDoc(firstSpans, secondSpans)){
+				
+					firstSpanList.add(new CandidateSpan(firstSpans));
+					secondSpanList.add(new CandidateSpan(secondSpans));
+					hasMoreFirstSpans = firstSpans.next();
+					hasMoreSecondSpans = secondSpans.next();
+					
+					setMatchList();
+				}
+				else { hasMoreSpans = false; }
+			}
+			else { setMatchList(); }
+			
+		}
 		return false;
 	}
+	
+	/** Set the list of matches between the span having the smallest position, and 
+	 * 	its candidates. Simply remove the span if it does not have any candidates.
+	 * */
+	private void setMatchList() throws IOException {
+		
+		hasMoreFirstSpans = setCandidateList(firstSpanList,firstSpans,
+				hasMoreFirstSpans,secondSpanList);
+		hasMoreSecondSpans = setCandidateList(secondSpanList,secondSpans,
+				hasMoreSecondSpans,firstSpanList);		
+	
+		CandidateSpan currentFirstSpan, currentSecondSpan;
+		if (!firstSpanList.isEmpty() && !secondSpanList.isEmpty()){
+			
+			currentFirstSpan = firstSpanList.get(0)	;
+			currentSecondSpan = secondSpanList.get(0);
+			
+			if (currentFirstSpan.getEnd() < currentSecondSpan.getEnd()){
+				matchList = findMatches(currentFirstSpan, secondSpanList);
+				firstSpanList.remove(0);
+			}
+			else {
+				matchList = findMatches(currentSecondSpan, firstSpanList);
+				secondSpanList.remove(0);
+			}
+		}
+		else if (firstSpanList.isEmpty())
+			secondSpanList.remove(0);
+		else firstSpanList.remove(0);		
+	}
+	
+	/** Search all matches between the target span and its candidates in the candidate 
+	 * 	list.
+	 * 	@return the matches in a list 
+	 * */
+	private List<CandidateSpan> findMatches(CandidateSpan target, List<CandidateSpan> 
+		candidateList) {
+		
+		List<CandidateSpan> matches = new ArrayList<>();		
+		int actualDistance;
+		for (CandidateSpan cs : candidateList){
+			if (minDistance == 0 &&
+					//intersection
+					target.getStart() < cs.getEnd() &&
+					cs.getStart() < target.getEnd()){
+				matches.add(createMatchCandidate(target,cs,true));
+				continue;
+			}
+			
+			// left candidate
+			if (cs.getEnd() < target.getStart())
+				actualDistance = target.getStart() - cs.getEnd() +1;			 
+			else // right candidate
+				actualDistance = cs.getStart() - target.getEnd() +1;
+			 
+			if (minDistance <= actualDistance && actualDistance <= maxDistance)
+				matches.add(createMatchCandidate(target, cs, false));			
+		}		
+		return matches;
+	}
+	
+	/** Compute match properties and create a candidate span match 
+	 * 	to be added to the match list.
+	 * 	@return a candidate span match 
+	 * */
+	private CandidateSpan createMatchCandidate(CandidateSpan target,
+			CandidateSpan cs, boolean isDistanceZero) {
+		
+		int start = Math.min(target.getStart(), cs.getStart());
+		int end = Math.max(target.getEnd(),cs.getEnd());
+		int doc = target.getDoc();
+		long cost = target.getCost() + cs.getCost();
+		
+		Collection<byte[]> payloads = new LinkedList<byte[]>();
+		if (collectPayloads) {
+			if (target.getPayloads() != null){
+				payloads.addAll(target.getPayloads());
+			}
+			if (cs.getPayloads() != null){
+				payloads.addAll(cs.getPayloads());
+			}
+		}
+		return new CandidateSpan(start,end,doc,cost,payloads);
+	}
+
+	/** Set the candidate list for the first element in the target list.
+	 * @return true iff the spans enumeration still has a next element 
+	 * to be a candidate
+	 */
+	private boolean setCandidateList(List<CandidateSpan> 
+			candidateList, Spans candidate, boolean hasMoreCandidates,
+			List<CandidateSpan> targetList) throws IOException {
+		
+		if (!targetList.isEmpty()){
+			CandidateSpan target = targetList.get(0);
+			while (hasMoreCandidates && candidate.doc() == target.getDoc()
+					&& isWithinMaxDistance(target,candidate)){
+				candidateList.add(new CandidateSpan(candidate));
+				hasMoreCandidates = candidate.next();
+			}
+		}
+		return hasMoreCandidates;
+	}
+
+	
+	/** Check if the target and candidate spans are not too far from each other.
+	 *  @return true iff the target and candidate spans are within the maximum
+	 *  distance
+	 * */
+	private boolean isWithinMaxDistance(CandidateSpan target, Spans candidate) {
+		// left candidate
+		if (candidate.end() < target.getStart() && 
+				candidate.end() + maxDistance <= target.getStart()){
+			return false;
+		}
+		// right candidate
+		if (candidate.start() > target.getEnd() &&
+				target.getEnd() + maxDistance <= candidate.start()){
+			return false;
+		}
+		return true;
+	}
+
+	/** Assign the first candidate span in the match list as 
+	 * 	the current span match.
+	 * */
+	private void setMatch() {
+		CandidateSpan cs = matchList.get(0);
+		matchDocNumber = cs.getDoc();
+		matchStartPosition = cs.getStart();
+		matchEndPosition = cs.getEnd();
+		matchCost = cs.getCost();
+		matchPayload.addAll(cs.getPayloads());
+		matchList.remove(0);
+		
+		log.trace("Match doc#={} start={} end={}", matchDocNumber,
+				matchStartPosition,matchEndPosition);
+	}
 
 	@Override
 	public boolean skipTo(int target) throws IOException {
-		// TODO Auto-generated method stub
-		return false;
+		if (hasMoreSpans && (secondSpans.doc() < target)){
+  			if (!secondSpans.skipTo(target)){
+  				hasMoreSpans = false;
+  				return false;
+  			}  			
+  		}
+		
+		firstSpanList.clear();
+		secondSpanList.clear();
+		matchPayload.clear();
+		isStartEnumeration=false;
+		return advance();		
 	}
 
 	@Override
 	public long cost() {
-		// TODO Auto-generated method stub
-		return 0;
+		return matchCost;
 	}
 
 }
diff --git a/src/test/java/de/ids_mannheim/korap/index/TestDistanceIndex.java b/src/test/java/de/ids_mannheim/korap/index/TestDistanceIndex.java
index 8e131d5..a1ff477 100644
--- a/src/test/java/de/ids_mannheim/korap/index/TestDistanceIndex.java
+++ b/src/test/java/de/ids_mannheim/korap/index/TestDistanceIndex.java
@@ -22,7 +22,7 @@
     KorapResult kr;
     KorapIndex ki;   
  
-    protected FieldDocument createFieldDoc0() {
+    private FieldDocument createFieldDoc0() {
     	FieldDocument fd = new FieldDocument();
         fd.addString("ID", "doc-0");
         fd.addTV("base",
@@ -36,7 +36,7 @@
         return fd;
 	}
     
-    protected FieldDocument createFieldDoc1(){
+    private FieldDocument createFieldDoc1(){
     	FieldDocument fd = new FieldDocument();
         fd.addString("ID", "doc-1");
         fd.addTV("base",
@@ -54,7 +54,7 @@
         return fd;
     }
     
-    protected FieldDocument createFieldDoc2() {
+    private FieldDocument createFieldDoc2() {
     	FieldDocument fd = new FieldDocument();
         fd.addString("ID", "doc-2");
         fd.addTV("base",
@@ -68,7 +68,7 @@
         return fd;
 	}
     
-    protected SpanQuery createQuery(String x, String y, int min, int max, boolean isOrdered){
+    private SpanQuery createQuery(String x, String y, int min, int max, boolean isOrdered){
     	SpanQuery sq = new SpanDistanceQuery(
         		new SpanTermQuery(new Term("base",x)),
         		new SpanTermQuery(new Term("base",y)),
@@ -80,7 +80,7 @@
     	return sq;
     }
     
-    protected SpanQuery createElementQuery(String x, String y, int min, int max, boolean isOrdered){
+    private SpanQuery createElementQuery(String x, String y, int min, int max, boolean isOrdered){
     	SpanQuery sq = new SpanDistanceQuery(
         		new SpanElementQuery("base",x),
         		new SpanElementQuery("base",y),
diff --git a/src/test/java/de/ids_mannheim/korap/index/TestUnorderedDistanceIndex.java b/src/test/java/de/ids_mannheim/korap/index/TestUnorderedDistanceIndex.java
new file mode 100644
index 0000000..d4385f6
--- /dev/null
+++ b/src/test/java/de/ids_mannheim/korap/index/TestUnorderedDistanceIndex.java
@@ -0,0 +1,203 @@
+package de.ids_mannheim.korap.index;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.spans.SpanQuery;
+import org.apache.lucene.search.spans.SpanTermQuery;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import de.ids_mannheim.korap.KorapIndex;
+import de.ids_mannheim.korap.KorapResult;
+import de.ids_mannheim.korap.query.SpanDistanceQuery;
+import de.ids_mannheim.korap.query.SpanElementQuery;
+import de.ids_mannheim.korap.query.SpanNextQuery;
+import de.ids_mannheim.korap.query.SpanSegmentQuery;
+
+@RunWith(JUnit4.class)
+public class TestUnorderedDistanceIndex{
+	
+    private KorapIndex ki;
+	private KorapResult kr;
+
+	private FieldDocument createFieldDoc0(){
+    	FieldDocument fd = new FieldDocument();
+        fd.addString("ID", "doc-0");
+        fd.addTV("base",
+            "text",
+            "[(0-1)s:c|_1#0-1]" +
+            "[(1-2)s:e|_2#1-2]" +             
+            "[(2-3)s:c|_3#2-3|<>:y#2-4$<i>4]" +
+            "[(3-4)s:c|_4#3-4|<>:x#3-7$<i>7]" + 
+            "[(4-5)s:d|_5#4-5|<>:y#4-6$<i>6]" +             
+            "[(5-6)s:c|_6#5-6|<>:y#5-8$<i>8]" +
+            "[(6-7)s:d|_7#6-7]" +
+            "[(7-8)s:f|_8#7-8|<>:x#7-9$<i>9]" + 
+            "[(8-9)s:e|_9#8-9|<>:x#8-10$<i>10]" + 
+            "[(9-10)s:d|_10#9-10]");
+        return fd;
+    }
+	
+	private FieldDocument createFieldDoc1(){
+		FieldDocument fd = new FieldDocument();
+        fd.addString("ID", "doc-1");
+        fd.addTV("base",
+            "text",             
+            "[(0-1)s:d|_1#0-1]" +
+            "[(1-2)s:c|_2#1-2]" +             
+            "[(2-3)s:e|_3#2-3]" +
+            "[(3-4)s:e|_4#3-4]" + 
+            "[(4-5)s:d|_5#4-5]" +             
+            "[(5-6)s:e|_6#5-6]" +
+            "[(6-7)s:e|_7#6-7]" +
+            "[(7-8)s:c|_8#7-8]" + 
+            "[(8-9)s:e|_9#8-9]" + 
+            "[(9-10)s:d|_10#9-10]");
+        return fd;
+	}
+	
+	private FieldDocument createFieldDoc2(){
+		FieldDocument fd = new FieldDocument();
+        fd.addString("ID", "doc-2");
+        fd.addTV("base",
+            "text",             
+            "[(0-1)s:f|_1#0-1]" +
+            "[(1-2)s:c|_2#1-2]" +             
+            "[(2-3)s:e|_3#2-3]" +
+            "[(3-4)s:e|_4#3-4]" + 
+            "[(4-5)s:d|_5#4-5]" +             
+            "[(5-6)s:f|_6#5-6]" +
+            "[(6-7)s:f|_7#6-7]" );
+        return fd;
+	}
+	
+    private SpanQuery createQuery(String x, String y, int min, int max, 
+    		boolean isOrdered){
+    	SpanQuery sq = new SpanDistanceQuery(
+        		new SpanTermQuery(new Term("base",x)),
+        		new SpanTermQuery(new Term("base",y)),
+        		min,
+        		max,
+        		isOrdered,
+        		true
+        );    	 
+    	return sq;
+    }
+	
+    private SpanQuery createElementQuery(String x, String y, int min, int max, 
+    		boolean isOrdered){
+    	SpanQuery sq = new SpanDistanceQuery(
+        		new SpanElementQuery("base",x),
+        		new SpanElementQuery("base",y),
+        		min,
+        		max,
+        		isOrdered,
+        		true
+        );
+    	return sq;
+    }
+    
+    /** One document, multiple occurrences
+     * 	The first first and second spans are too far from each other
+     * 	One of the spans ends first
+     * 	One of the candidate list is empty
+     * */
+	@Test
+	public void testCase1() throws IOException{
+		 System.out.println("testcase 1");
+		ki = new KorapIndex();
+	    ki.addDoc(createFieldDoc0()); 
+	    ki.commit();    
+	    
+	    SpanQuery sq = createQuery("s:c","s:d",0,3,false);                
+	    kr = ki.search(sq, (short) 10);
+	    	    
+	    assertEquals(5, kr.totalResults());
+	}
+	
+	/** Multiple documents 
+	 * 	Ensure same doc
+	 * 	Both candidate lists are empty, but there is a span left in the doc
+	 * 	Both candidate lists are empty, but there are more matches in the doc
+	 * @throws IOException 
+	 * */
+	@Test
+	public void testCase2() throws IOException{
+		System.out.println("testcase 2");
+		ki = new KorapIndex();
+		ki.addDoc(createFieldDoc0());
+	    ki.addDoc(createFieldDoc1()); 
+	    ki.commit();    
+	    
+	    SpanQuery sq = createQuery("s:c","s:d",1,2,false);                
+	    kr = ki.search(sq, (short) 10);
+	    	    
+	    assertEquals(6, kr.totalResults());
+		
+	}
+	
+	/** Multiple documents 
+	 * 	Ensure same Doc
+	 * @throws IOException 
+	 * */
+	@Test
+	public void testCase3() throws IOException{		
+		System.out.println("testcase 3");
+		ki = new KorapIndex();
+		ki.addDoc(createFieldDoc0());
+	    ki.addDoc(createFieldDoc1()); 
+	    ki.addDoc(createFieldDoc2());
+	    ki.commit();    
+	    
+	    SpanQuery sq = createQuery("s:e","s:f",1,2,false);                
+	    kr = ki.search(sq, (short) 10);
+	    	    
+	    assertEquals(3, kr.totalResults());
+	    assertEquals(0, kr.match(0).getLocalDocID());
+	    assertEquals(7, kr.match(0).getStartPos());
+	    assertEquals(9, kr.match(0).getEndPos());
+	    assertEquals(2, kr.match(1).getLocalDocID());
+	    assertEquals(0, kr.match(1).getStartPos());
+	    assertEquals(3, kr.match(1).getEndPos());
+	}
+	
+	/** Skip to */
+	@Test
+	public void testCase4() throws IOException{
+		System.out.println("testcase 4");
+		ki = new KorapIndex();
+		ki.addDoc(createFieldDoc0());
+	    ki.addDoc(createFieldDoc1()); 
+	    ki.addDoc(createFieldDoc2());
+	    ki.commit();
+	    
+	    SpanQuery sq = new SpanNextQuery(
+	    		createQuery("s:d","s:e",1,2,false),
+	    		new SpanTermQuery(new Term("base","s:f"))	    		
+		);
+	    
+	    kr = ki.search(sq, (short) 10);
+	    assertEquals(2, kr.totalResults());
+	    assertEquals(2,kr.getMatch(0).getLocalDocID());
+	    assertEquals(2,kr.getMatch(0).getStartPos());
+	    assertEquals(6,kr.getMatch(0).getEndPos());
+	    assertEquals(3,kr.getMatch(1).getStartPos());
+	    assertEquals(6,kr.getMatch(1).getEndPos());
+	    
+	    System.out.print(kr.getTotalResults()+"\n");
+			for (int i=0; i< kr.getTotalResults(); i++){
+				System.out.println(
+					kr.match(i).getLocalDocID()+" "+
+					kr.match(i).startPos + " " +
+					kr.match(i).endPos
+			    );
+			}
+	}
+	
+	
+
+}