1
2
3
4
5
6 package org.openrdf.repository.readahead;
7
8 import info.aduna.iteration.CloseableIteration;
9 import info.aduna.iteration.CloseableIteratorIteration;
10
11 import java.util.Iterator;
12
13 import org.openrdf.model.Resource;
14 import org.openrdf.model.Statement;
15 import org.openrdf.model.URI;
16 import org.openrdf.model.Value;
17 import org.openrdf.repository.Repository;
18 import org.openrdf.repository.RepositoryConnection;
19 import org.openrdf.repository.RepositoryException;
20 import org.openrdf.repository.RepositoryResult;
21 import org.openrdf.repository.delegate.VersioningConnection;
22 import org.openrdf.repository.sail.SailRepository;
23 import org.openrdf.sail.memory.MemoryStore;
24
25 class ReadAheadConnection extends VersioningConnection {
26
27 private int _subjCacheVersion = -1;
28
29 private RepositoryConnection _subjCacheExpl;
30
31 private RepositoryConnection _subjCacheInfr;
32
33 public ReadAheadConnection(Repository repository,
34 RepositoryConnection delegate) {
35 super(repository, delegate, 0);
36 }
37
38 @Override
39 public synchronized RepositoryResult<Statement> getStatements(
40 Resource subj, URI pred, Value obj, boolean inf, Resource... ctx)
41 throws RepositoryException {
42 if (subj == null)
43 return super.getStatements(subj, pred, obj, inf, ctx);
44 RepositoryConnection cache = getSubjCache(inf);
45 if (cache.hasStatement(subj, null, null, inf)) {
46 return detach(cache.getStatements(subj, pred, obj, inf, ctx));
47 }
48 cache.setAutoCommit(false);
49 RepositoryResult<Statement> stIter;
50 stIter = super.getStatements(subj, null, null, inf);
51 try {
52 while (stIter.hasNext()) {
53 cache.add(stIter.next());
54 }
55 } finally {
56 stIter.close();
57 }
58 cache.setAutoCommit(true);
59 return detach(cache.getStatements(subj, pred, obj, inf, ctx));
60 }
61
62 @Override
63 public void close() throws RepositoryException {
64 Repository cacheRepo;
65 if (_subjCacheInfr != null) {
66 cacheRepo = _subjCacheInfr.getRepository();
67 _subjCacheInfr.close();
68 cacheRepo.shutDown();
69 }
70 if (_subjCacheExpl != null) {
71 cacheRepo = _subjCacheExpl.getRepository();
72 _subjCacheExpl.close();
73 cacheRepo.shutDown();
74 }
75 super.close();
76 }
77
78 private RepositoryConnection getSubjCache(boolean inf)
79 throws RepositoryException {
80 Repository cacheRepo;
81 if (_subjCacheVersion < getVersion()) {
82 if (_subjCacheInfr != null)
83 _subjCacheInfr.clear();
84 if (_subjCacheExpl != null)
85 _subjCacheExpl.clear();
86 _subjCacheVersion = getVersion();
87 }
88 if (inf) {
89 if (_subjCacheInfr == null) {
90 cacheRepo = new SailRepository(new MemoryStore());
91 cacheRepo.initialize();
92 _subjCacheInfr = cacheRepo.getConnection();
93 }
94 return _subjCacheInfr;
95 }
96 if (_subjCacheExpl == null) {
97 cacheRepo = new SailRepository(new MemoryStore());
98 cacheRepo.initialize();
99 _subjCacheExpl = cacheRepo.getConnection();
100 }
101 return _subjCacheExpl;
102 }
103
104 private RepositoryResult<Statement> detach(RepositoryResult<Statement> stmts)
105 throws RepositoryException {
106 try {
107 Iterator<Statement> iter = stmts.asList().iterator();
108 CloseableIteration<Statement, RepositoryException> itern;
109 itern = new CloseableIteratorIteration<Statement, RepositoryException>(
110 iter);
111 return new RepositoryResult<Statement>(itern);
112 } finally {
113 stmts.close();
114 }
115 }
116
117 }