1
2
3
4
5
6 package org.openrdf.repository.flushable;
7
8 import info.aduna.iteration.CloseableIteration;
9 import info.aduna.iteration.FilterIteration;
10
11 import java.util.Iterator;
12
13 import org.openrdf.model.Resource;
14 import org.openrdf.model.Statement;
15 import org.openrdf.model.URI;
16 import org.openrdf.model.Value;
17 import org.openrdf.repository.Repository;
18 import org.openrdf.repository.RepositoryConnection;
19 import org.openrdf.repository.RepositoryException;
20 import org.openrdf.repository.RepositoryResult;
21 import org.openrdf.repository.base.RepositoryConnectionWrapper;
22 import org.openrdf.repository.sail.SailRepository;
23 import org.openrdf.repository.util.RDFInserter;
24 import org.openrdf.rio.RDFHandler;
25 import org.openrdf.rio.RDFHandlerException;
26 import org.openrdf.sail.memory.MemoryStore;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public class FlushableConnection extends RepositoryConnectionWrapper {
45
46 private boolean autoFlush = true;
47
48 private RepositoryConnection added;
49
50 RepositoryConnection removed;
51
52 public FlushableConnection(Repository repository)
53 throws RepositoryException {
54 this(repository, repository.getConnection());
55 }
56
57 public FlushableConnection(Repository repository, RepositoryConnection conn)
58 throws RepositoryException {
59 super(repository, conn);
60 if (!autoFlush) {
61 initUnFlushedStatements();
62 }
63 }
64
65 public boolean isAutoFlush() throws RepositoryException {
66 return autoFlush;
67 }
68
69 public void setAutoFlush(boolean autoFlush) throws RepositoryException {
70 if (this.autoFlush == autoFlush)
71 return;
72 if (autoFlush) {
73 flush();
74 destroyUnflushedStatements();
75 } else {
76 initUnFlushedStatements();
77 }
78 this.autoFlush = autoFlush;
79 }
80
81 public void flush() throws RepositoryException {
82 if (autoFlush)
83 return;
84 RepositoryResult<Statement> stmts;
85 if (!added.isEmpty()) {
86 stmts = added.getStatements(null, null, null, false);
87 try {
88 getDelegate().add(stmts);
89 } finally {
90 stmts.close();
91 }
92 added.clear();
93 }
94 if (!removed.isEmpty()) {
95 stmts = removed.getStatements(null, null, null, false);
96 try {
97 while (stmts.hasNext()) {
98 getDelegate().remove(stmts.next());
99 }
100 } finally {
101 stmts.close();
102 }
103 removed.clear();
104 }
105 }
106
107 public void clear() throws RepositoryException {
108 if (autoFlush)
109 return;
110 added.clear();
111 removed.clear();
112 }
113
114 @Override
115 protected boolean isDelegatingAdd() throws RepositoryException {
116 return autoFlush;
117 }
118
119 @Override
120 protected boolean isDelegatingRead() throws RepositoryException {
121 if (autoFlush)
122 return true;
123 return added.isEmpty() && removed.isEmpty();
124 }
125
126 @Override
127 protected boolean isDelegatingRemove() throws RepositoryException {
128 return autoFlush;
129 }
130
131 @Override
132 public void close() throws RepositoryException {
133 if (!autoFlush) {
134 clear();
135 setAutoFlush(true);
136 }
137 super.close();
138 }
139
140 @Override
141 public void setAutoCommit(boolean autoCommit) throws RepositoryException {
142 if (autoCommit && !autoFlush) {
143 flush();
144 }
145 super.setAutoCommit(autoCommit);
146 }
147
148 @Override
149 public void commit() throws RepositoryException {
150 if (!autoFlush) {
151 flush();
152 }
153 super.commit();
154 }
155
156 @Override
157 public void rollback() throws RepositoryException {
158 if (!autoFlush) {
159 clear();
160 }
161 super.rollback();
162 }
163
164 @Override
165 public synchronized RepositoryResult<Statement> getStatements(
166 Resource subj, URI pred, Value obj, final boolean includeInferred,
167 Resource... contexts) throws RepositoryException {
168 if (isDelegatingRead())
169 return super.getStatements(subj, pred, obj, includeInferred,
170 contexts);
171 final Iterator<Statement> include;
172 final boolean excluded = removed.hasStatement(subj, pred, obj,
173 includeInferred, contexts);
174 final RepositoryResult<Statement> result;
175 include = added.getStatements(subj, pred, obj, includeInferred,
176 contexts).asList().iterator();
177 result = getDelegate().getStatements(subj, pred, obj, includeInferred,
178 contexts);
179 if (!include.hasNext() && !excluded)
180 return result;
181 final CloseableIteration<Statement, RepositoryException> filtered;
182 if (excluded) {
183 filtered = new FilterIteration<Statement, RepositoryException>(
184 result) {
185 @Override
186 protected boolean accept(Statement stmt)
187 throws RepositoryException {
188 return !removed.hasStatement(stmt, includeInferred);
189 }
190 };
191 } else {
192 filtered = result;
193 }
194 CloseableIteration<Statement, RepositoryException> iter;
195 iter = new CloseableIteration<Statement, RepositoryException>() {
196 private Statement stmt;
197
198 public void close() throws RepositoryException {
199 filtered.close();
200 }
201
202 public boolean hasNext() throws RepositoryException {
203 return include.hasNext() || filtered.hasNext();
204 }
205
206 public Statement next() throws RepositoryException {
207 if (include.hasNext())
208 return stmt = include.next();
209 return stmt = filtered.next();
210 }
211
212 public void remove() throws RepositoryException {
213 FlushableConnection.this.remove(stmt);
214 }
215 };
216 return new RepositoryResult<Statement>(iter);
217 }
218
219 @Override
220 public boolean isEmpty() throws RepositoryException {
221 if (isDelegatingRead())
222 return super.isEmpty();
223 if (!added.isEmpty())
224 return false;
225 if (super.isEmpty())
226 return true;
227 if (removed.isEmpty())
228 return false;
229 if (super.size() > removed.size())
230 return false;
231 return super.isEmpty();
232 }
233
234 @Override
235 public long size(Resource... contexts) throws RepositoryException {
236 if (isDelegatingRead())
237 return super.size();
238 long size = super.size(contexts);
239 long rsize = removed.size(contexts);
240 long asize = added.size(contexts);
241 return size - rsize + asize;
242 }
243
244 @Override
245 protected synchronized void addWithoutCommit(Resource subject,
246 URI predicate, Value object, Resource... contexts)
247 throws RepositoryException {
248 if (!removed.hasStatement(subject, predicate, object, false, contexts)) {
249 added.add(subject, predicate, object, contexts);
250 }
251 removed.remove(subject, predicate, object, contexts);
252 }
253
254 @Override
255 protected synchronized void removeWithoutCommit(Resource subject,
256 URI predicate, Value object, Resource... contexts)
257 throws RepositoryException {
258 RDFHandler handler = new RDFInserter(removed);
259 try {
260 getDelegate().exportStatements(subject, predicate, object, true,
261 handler, contexts);
262 } catch (RDFHandlerException e) {
263 if (e.getCause() instanceof RepositoryException)
264 throw (RepositoryException) e.getCause();
265 throw new AssertionError(e);
266 }
267 added.remove(subject, predicate, object, contexts);
268 }
269
270 private void initUnFlushedStatements() throws RepositoryException {
271 Repository addRepo = new SailRepository(new MemoryStore());
272 Repository removeRepo = new SailRepository(new MemoryStore());
273 addRepo.initialize();
274 removeRepo.initialize();
275 added = addRepo.getConnection();
276 removed = removeRepo.getConnection();
277 }
278
279 private void destroyUnflushedStatements() throws RepositoryException {
280 Repository add = added.getRepository();
281 Repository remove = removed.getRepository();
282 added.close();
283 removed.close();
284 add.shutDown();
285 remove.shutDown();
286 }
287 }