View Javadoc

1   /*
2    * Copyright James Leigh (c) 2007.
3    *
4    * Licensed under the Aduna BSD-style license.
5    */
6   package org.openrdf.repository.flushable;
7   
8   import info.aduna.iteration.CloseableIteration;
9   import info.aduna.iteration.FilterIteration;
10  
11  import java.util.Iterator;
12  
13  import org.openrdf.model.Resource;
14  import org.openrdf.model.Statement;
15  import org.openrdf.model.URI;
16  import org.openrdf.model.Value;
17  import org.openrdf.repository.Repository;
18  import org.openrdf.repository.RepositoryConnection;
19  import org.openrdf.repository.RepositoryException;
20  import org.openrdf.repository.RepositoryResult;
21  import org.openrdf.repository.base.RepositoryConnectionWrapper;
22  import org.openrdf.repository.sail.SailRepository;
23  import org.openrdf.repository.util.RDFInserter;
24  import org.openrdf.rio.RDFHandler;
25  import org.openrdf.rio.RDFHandlerException;
26  import org.openrdf.sail.memory.MemoryStore;
27  
28  /**
29   * When not in autoFlush mode, each statement added and removed is
30   * tracked. When calling one of the read methods listed below the statement
31   * results will be corrected to include or exclude the modified statements on
32   * this connection. This behaviour mimics transaction isolation, but does not
33   * include unflushed statements in query processing.
34   * 
35   * @author James Leigh
36   * @see #getStatements(Resource, URI, Value, boolean, Resource[])
37   * @see #hasStatement(Statement, boolean, Resource[])
38   * @see #hasStatement(Resource, URI, Value, boolean, Resource[])
39   * @see #isEmpty()
40   * @see #export(RDFHandler, Resource[])
41   * @see #exportStatements(Resource, URI, Value, boolean, RDFHandler, Resource[])
42   * 
43   */
44  public class FlushableConnection extends RepositoryConnectionWrapper {
45  
46  	private boolean autoFlush = true;
47  
48  	private RepositoryConnection added;
49  
50  	RepositoryConnection removed;
51  
52  	public FlushableConnection(Repository repository)
53  			throws RepositoryException {
54  		this(repository, repository.getConnection());
55  	}
56  
57  	public FlushableConnection(Repository repository, RepositoryConnection conn)
58  			throws RepositoryException {
59  		super(repository, conn);
60  		if (!autoFlush) {
61  			initUnFlushedStatements();
62  		}
63  	}
64  
65  	public boolean isAutoFlush() throws RepositoryException {
66  		return autoFlush;
67  	}
68  
69  	public void setAutoFlush(boolean autoFlush) throws RepositoryException {
70  		if (this.autoFlush == autoFlush)
71  			return;
72  		if (autoFlush) {
73  			flush();
74  			destroyUnflushedStatements();
75  		} else {
76  			initUnFlushedStatements();
77  		}
78  		this.autoFlush = autoFlush;
79  	}
80  
81  	public void flush() throws RepositoryException {
82  		if (autoFlush)
83  			return;
84  		RepositoryResult<Statement> stmts;
85  		if (!added.isEmpty()) {
86  			stmts = added.getStatements(null, null, null, false);
87  			try {
88  				getDelegate().add(stmts);
89  			} finally {
90  				stmts.close();
91  			}
92  			added.clear();
93  		}
94  		if (!removed.isEmpty()) {
95  			stmts = removed.getStatements(null, null, null, false);
96  			try {
97  				while (stmts.hasNext()) {
98  					getDelegate().remove(stmts.next());
99  				}
100 			} finally {
101 				stmts.close();
102 			}
103 			removed.clear();
104 		}
105 	}
106 
107 	public void clear() throws RepositoryException {
108 		if (autoFlush)
109 			return;
110 		added.clear();
111 		removed.clear();
112 	}
113 
114 	@Override
115 	protected boolean isDelegatingAdd() throws RepositoryException {
116 		return autoFlush;
117 	}
118 
119 	@Override
120 	protected boolean isDelegatingRead() throws RepositoryException {
121 		if (autoFlush)
122 			return true;
123 		return added.isEmpty() && removed.isEmpty();
124 	}
125 
126 	@Override
127 	protected boolean isDelegatingRemove() throws RepositoryException {
128 		return autoFlush;
129 	}
130 
131 	@Override
132 	public void close() throws RepositoryException {
133 		if (!autoFlush) {
134 			clear();
135 			setAutoFlush(true);
136 		}
137 		super.close();
138 	}
139 
140 	@Override
141 	public void setAutoCommit(boolean autoCommit) throws RepositoryException {
142 		if (autoCommit && !autoFlush) {
143 			flush();
144 		}
145 		super.setAutoCommit(autoCommit);
146 	}
147 
148 	@Override
149 	public void commit() throws RepositoryException {
150 		if (!autoFlush) {
151 			flush();
152 		}
153 		super.commit();
154 	}
155 
156 	@Override
157 	public void rollback() throws RepositoryException {
158 		if (!autoFlush) {
159 			clear();
160 		}
161 		super.rollback();
162 	}
163 
164 	@Override
165 	public synchronized RepositoryResult<Statement> getStatements(
166 			Resource subj, URI pred, Value obj, final boolean includeInferred,
167 			Resource... contexts) throws RepositoryException {
168 		if (isDelegatingRead())
169 			return super.getStatements(subj, pred, obj, includeInferred,
170 					contexts);
171 		final Iterator<Statement> include;
172 		final boolean excluded = removed.hasStatement(subj, pred, obj,
173 				includeInferred, contexts);
174 		final RepositoryResult<Statement> result;
175 		include = added.getStatements(subj, pred, obj, includeInferred,
176 				contexts).asList().iterator();
177 		result = getDelegate().getStatements(subj, pred, obj, includeInferred,
178 				contexts);
179 		if (!include.hasNext() && !excluded)
180 			return result;
181 		final CloseableIteration<Statement, RepositoryException> filtered;
182 		if (excluded) {
183 			filtered = new FilterIteration<Statement, RepositoryException>(
184 					result) {
185 				@Override
186 				protected boolean accept(Statement stmt)
187 						throws RepositoryException {
188 					return !removed.hasStatement(stmt, includeInferred);
189 				}
190 			};
191 		} else {
192 			filtered = result;
193 		}
194 		CloseableIteration<Statement, RepositoryException> iter;
195 		iter = new CloseableIteration<Statement, RepositoryException>() {
196 			private Statement stmt;
197 
198 			public void close() throws RepositoryException {
199 				filtered.close();
200 			}
201 
202 			public boolean hasNext() throws RepositoryException {
203 				return include.hasNext() || filtered.hasNext();
204 			}
205 
206 			public Statement next() throws RepositoryException {
207 				if (include.hasNext())
208 					return stmt = include.next();
209 				return stmt = filtered.next();
210 			}
211 
212 			public void remove() throws RepositoryException {
213 				FlushableConnection.this.remove(stmt);
214 			}
215 		};
216 		return new RepositoryResult<Statement>(iter);
217 	}
218 
219 	@Override
220 	public boolean isEmpty() throws RepositoryException {
221 		if (isDelegatingRead())
222 			return super.isEmpty();
223 		if (!added.isEmpty())
224 			return false;
225 		if (super.isEmpty())
226 			return true;
227 		if (removed.isEmpty())
228 			return false;
229 		if (super.size() > removed.size())
230 			return false;
231 		return super.isEmpty();
232 	}
233 
234 	@Override
235 	public long size(Resource... contexts) throws RepositoryException {
236 		if (isDelegatingRead())
237 			return super.size();
238 		long size = super.size(contexts);
239 		long rsize = removed.size(contexts);
240 		long asize = added.size(contexts);
241 		return size - rsize + asize;
242 	}
243 
244 	@Override
245 	protected synchronized void addWithoutCommit(Resource subject,
246 			URI predicate, Value object, Resource... contexts)
247 			throws RepositoryException {
248 		if (!removed.hasStatement(subject, predicate, object, false, contexts)) {
249 			added.add(subject, predicate, object, contexts);
250 		}
251 		removed.remove(subject, predicate, object, contexts);
252 	}
253 
254 	@Override
255 	protected synchronized void removeWithoutCommit(Resource subject,
256 			URI predicate, Value object, Resource... contexts)
257 			throws RepositoryException {
258 		RDFHandler handler = new RDFInserter(removed);
259 		try {
260 			getDelegate().exportStatements(subject, predicate, object, true,
261 					handler, contexts);
262 		} catch (RDFHandlerException e) {
263 			if (e.getCause() instanceof RepositoryException)
264 				throw (RepositoryException) e.getCause();
265 			throw new AssertionError(e);
266 		}
267 		added.remove(subject, predicate, object, contexts);
268 	}
269 
270 	private void initUnFlushedStatements() throws RepositoryException {
271 		Repository addRepo = new SailRepository(new MemoryStore());
272 		Repository removeRepo = new SailRepository(new MemoryStore());
273 		addRepo.initialize();
274 		removeRepo.initialize();
275 		added = addRepo.getConnection();
276 		removed = removeRepo.getConnection();
277 	}
278 
279 	private void destroyUnflushedStatements() throws RepositoryException {
280 		Repository add = added.getRepository();
281 		Repository remove = removed.getRepository();
282 		added.close();
283 		removed.close();
284 		add.shutDown();
285 		remove.shutDown();
286 	}
287 }