View Javadoc

1   /*
2    * Copyright (c) 2007, Peter Mika All rights reserved.
3    * 
4    * Redistribution and use in source and binary forms, with or without
5    * modification, are permitted provided that the following conditions are met:
6    * 
7    * - Redistributions of source code must retain the above copyright notice, this
8    *   list of conditions and the following disclaimer.
9    * - Redistributions in binary form must reproduce the above copyright notice,
10   *   this list of conditions and the following disclaimer in the documentation
11   *   and/or other materials provided with the distribution. 
12   * - Neither the name of the openrdf.org nor the names of its contributors may
13   *   be used to endorse or promote products derived from this software without
14   *   specific prior written permission.
15   * 
16   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26   * POSSIBILITY OF SUCH DAMAGE.
27   * 
28   */
29  package org.openrdf.elmo.scutter;
30  
31  import info.aduna.iteration.Iterations;
32  
33  import java.io.IOException;
34  import java.net.MalformedURLException;
35  import java.net.URL;
36  import java.util.Collection;
37  import java.util.Date;
38  import java.util.List;
39  
40  import org.apache.commons.httpclient.HttpException;
41  import org.openrdf.OpenRDFException;
42  import org.openrdf.model.Resource;
43  import org.openrdf.model.Statement;
44  import org.openrdf.model.URI;
45  import org.openrdf.model.Value;
46  import org.openrdf.model.vocabulary.RDFS;
47  import org.openrdf.repository.Repository;
48  import org.openrdf.repository.RepositoryConnection;
49  import org.openrdf.repository.RepositoryException;
50  import org.openrdf.repository.RepositoryResult;
51  import org.openrdf.repository.sail.SailRepository;
52  import org.openrdf.rio.ParseErrorListener;
53  import org.openrdf.rio.RDFHandlerException;
54  import org.openrdf.rio.RDFParseException;
55  import org.openrdf.rio.RDFParser;
56  import org.openrdf.rio.helpers.StatementCollector;
57  import org.openrdf.rio.rdfxml.RDFXMLParser;
58  import org.openrdf.sail.memory.MemoryStore;
59  import org.slf4j.Logger;
60  import org.slf4j.LoggerFactory;
61  
62  /**
63   * A SimpleRetriever is instantiated by the Scutter class for every URL to be
64   * visited. The Retriver fetches that document and adds the allowed statements
65   * to an in-memory repository. Statements can be optionally filtered by specying
66   * a StatementFilter. The seeAlso statements found in the document are added to
67   * the scutter queue.
68   * 
69   * @author Peter Mika (original version for Jena by Matt Biddulph.)
70   * @version $Revision: 1.8 $
71   */
72  
73  public class SimpleRetriever implements Retriever {
74  
75  	protected URL _url;
76  
77  	protected Scutter _scutter;
78  
79  	protected Repository _repository = null;
80  
81  	protected boolean _ok = true;
82  
83  	protected final static Logger _logger = LoggerFactory
84  			.getLogger(SimpleRetriever.class);
85  
86  	// NOTE: we can make parser static, but only if the parser itself is
87  	// threadsafe
88  	// and that is doubtful: there is only one StatementHandler inside and that
89  	// should be different for every URL (content goes into separate
90  	// repositories)
91  	protected RDFParser _parser = null;
92  
93  	protected StatementFilter _filter = null;
94  
95  	protected DocumentHandler _handler = null;
96  
97  	public SimpleRetriever(final URL url, final Repository repository,
98  			final Scutter scutter) {
99  		setUrl(url);
100 		setRepository(repository);
101 		setScutter(scutter);
102 		setFilter(new DefaultStatementFilter());
103 		// Use the SAX2-compliant Xerces parser:
104 		System.setProperty("org.xml.sax.driver",
105 				"org.apache.xerces.parsers.SAXParser");
106 		_parser = new RDFXMLParser(repository.getValueFactory());
107 		_parser.setVerifyData(true);
108 		_parser.setStopAtFirstError(false);
109 
110 		_handler = new SimpleDocumentHandler(_filter);
111 	}
112 
113 	static class DefaultStatementFilter implements StatementFilter {
114 		public boolean allowStatement(Resource subject, URI predicate,
115 				Value object) {
116 			/*
117 			 * Example: store just RDF, RDF-S, FOAF and DC triples. Ignore
118 			 * statements about resources in the SOCIONET namespace.
119 			 * 
120 			 * if
121 			 * ((predicate.toString().startsWith(flink.util.Constants.FOAF_NS) ||
122 			 * predicate.toString().startsWith(flink.util.Constants.WGS84_POS_NS) ||
123 			 * predicate.toString().startsWith(flink.util.Constants.RDF_NS) ||
124 			 * predicate.toString().startsWith(flink.util.Constants.RDFS_NS)) &&
125 			 * !subject.toString().startsWith(flink.util.Constants.SOCIONET_NS)) {
126 			 * 
127 			 * return true; } else { return false; }
128 			 * 
129 			 */
130 			return true;
131 		}
132 	}
133 
134 	class SimpleDocumentHandler extends StatementCollector implements
135 			DocumentHandler {
136 
137 		public final static int DEFAULT_MAXSIZE = 10000;
138 
139 		// Interrupt execution after this many statements have been processed
140 		protected final int _maxSize = DEFAULT_MAXSIZE;
141 
142 		protected int _stmtCount = 0;
143 
144 		protected StatementFilter _filter = new DefaultStatementFilter();
145 
146 		public SimpleDocumentHandler(StatementFilter filter) {
147 			if (filter != null) {
148 				_filter = filter;
149 			}
150 		}
151 
152 		public void parse(URL url) {
153 
154 			// Parse the data from inputStream,
155 			// resolving any relative URIs against the URL of the source
156 			// Set statement and error handler
157 			_parser.setParseErrorListener(new RetrieverErrorListener());
158 			_parser.setRDFHandler(this);
159 			try {
160 				_parser.parse(Util.getDocumentAsInputStream(url.toString()),
161 						url.toString());
162 			} catch (RDFParseException e) {
163 				_logger.error(e.getMessage(), e);
164 			} catch (RDFHandlerException e) {
165 				_logger.error(e.getMessage(), e);
166 			} catch (HttpException e) {
167 				_logger.error(e.getMessage(), e);
168 			} catch (IOException e) {
169 				_logger.error(e.getMessage(), e);
170 			}
171 
172 		}
173 
174 		public void handleStatement(Statement stmt) {
175 			// Stop parsing in a brutal way if we exceeded the limit
176 			if (_stmtCount++ > _maxSize) {
177 				throw new RuntimeException("Document longer than " + _maxSize
178 						+ " statements, interrupting parsing.");
179 			}
180 			if (_filter.allowStatement(stmt.getSubject(), stmt.getPredicate(),
181 					stmt.getObject())) {
182 
183 				super.handleStatement(stmt);
184 
185 			}
186 		}
187 
188 		public boolean followLinks() {
189 			return true;
190 		}
191 
192 		public boolean aggregateContent() {
193 			return true;
194 		}
195 
196 		public Collection<Statement> getCollectedStatements() {
197 			return getStatements();
198 
199 		}
200 
201 	}
202 
203 	class RetrieverErrorListener implements ParseErrorListener {
204 
205 		public void error(java.lang.String msg, int lineNo, int colNo) {
206 			_logger.error(msg);
207 			_ok = false;
208 		}
209 
210 		public void fatalError(java.lang.String msg, int lineNo, int colNo) {
211 			_logger.error(msg);
212 			_ok = false;
213 		}
214 
215 		public void warning(java.lang.String msg, int lineNo, int colNo) {
216 
217 		}
218 	}
219 
220 	public void setRepository(Repository r) {
221 		_repository = r;
222 	}
223 
224 	public void setScutter(Scutter s) {
225 		_scutter = s;
226 	}
227 
228 	public void setUrl(URL u) {
229 		_url = u;
230 	}
231 
232 	public void setFilter(StatementFilter f) {
233 		_filter = f;
234 	}
235 
236 	public void run() {
237 		Repository temp = null;
238 		try {
239 			_logger.debug("Parsing " + _url);
240 
241 			_handler.parse(_url);
242 
243 			// Create new non-inferencing in-memory repository
244 			temp = new SailRepository(new MemoryStore());
245 			temp.initialize();
246 
247 			// Add the collected statements to the temporary repository
248 			RepositoryConnection tempCon = null;
249 			try {
250 				tempCon = temp.getConnection();
251 				tempCon.add(_handler.getCollectedStatements());
252 			} finally {
253 				tempCon.close();
254 			}
255 
256 			if (_ok) {
257 				_logger.info("Finished retrieving and parsing.");
258 
259 				if (!Thread.currentThread().isInterrupted()) {
260 					if (_handler.followLinks()) {
261 						// Add seeAlso links to the scutter queue
262 						addSeeAlsos(temp);
263 					} else {
264 						_logger.debug("Ignoring links in " + _url);
265 					}
266 					if (_handler.aggregateContent()) {
267 						// Add the statements to the repository
268 						aggregate(temp, _url.toString());
269 					} else {
270 						_logger.debug("Ignoring content in " + _url);
271 					}
272 				}
273 			} else {
274 				_logger.warn("Errors during parsing, not adding to the model.");
275 			}
276 		} catch (Throwable t) {
277 			_logger.warn("Errors loading " + _url, t);
278 		} finally {
279 			temp = null;
280 		}
281 	}
282 
283 	protected void aggregate(Repository incoming, String url)
284 			throws RepositoryException {
285 		Date date = new Date();
286 		int count = 0;
287 
288 		// There is only a single aggregator and we synchronize on that
289 		// so we shouldn't need transactions, right?
290 		RepositoryConnection con = null;
291 		try {
292 			con = incoming.getConnection();
293 			RepositoryResult<Statement> stmts = con.getStatements(null, null,
294 					null, true);
295 
296 			List<Statement> stmtList = Iterations.asList(stmts);
297 
298 			try {
299 
300 				for (Statement nextStatement : stmtList) {
301 					
302 					if (_logger.isDebugEnabled()) {
303 						if (nextStatement.getPredicate().getLocalName().equals("name")) {
304 							_logger
305 									.info("Found name: "
306 											+ nextStatement.getObject());
307 						}
308 					}
309 					// TODO: store metadata in context
310 					// if (_scutter.getMetadata()) {
311 					// Util.writeReificationStatement(rdfxml, nextStatement,
312 					// url,
313 					// date);
314 					// rdfxml.writeStatement(nextStatement.getSubject(),
315 					// nextStatement.getPredicate(), nextStatement
316 					// .getObject());
317 					// count += 7;
318 					// }
319 					count++;
320 
321 					// System.err.println("*** Document from " + url + "
322 					// starts here.");
323 					// System.err.println(writer.toString());
324 					// System.err.println("*** Document from " + url + "
325 					// ends here");
326 					// Upload RDF document to Sesame
327 				}
328 			} finally {
329 				// TODO: do I need to close this?
330 				stmts.close(); // make sure the result object is closed
331 				// properly
332 			}
333 
334 			if (count > 0) {
335 				con = null;
336 				try {
337 					con = _repository.getConnection();
338 					if (_scutter.getStoreMetadata()) {
339 						con.add(stmtList, _scutter.getContext());
340 					} else {
341 						con.add(stmtList);		
342 					}
343 					_logger.debug("Added " + count + " triples to model.");
344 				} catch (OpenRDFException e) {
345 					_logger.error("Uploading document from " + url
346 							+ " produced errors.");
347 				} finally {
348 					if (con != null)
349 						con.close();
350 				}
351 
352 			} else {
353 				_logger.debug("No statements in document " + url);
354 			}
355 		} catch (OpenRDFException e) {
356 			_logger.error(e.getMessage(), e);
357 		} finally {
358 			if (con != null)
359 				con.close();
360 		}
361 	}
362 
363 	/**
364 	 * Add all rdfs:seeAlso statements to the queue from the specified
365 	 * repository.
366 	 * 
367 	 * @param incoming
368 	 * @throws RepositoryException
369 	 */
370 	public void addSeeAlsos(Repository incoming) throws RepositoryException {
371 		RepositoryConnection con = null;
372 
373 		try {
374 			con = incoming.getConnection();
375 			// Get statements with the rdfs:seeAlso predicate
376 			RepositoryResult<Statement> stmts = con.getStatements(null,
377 					RDFS.SEEALSO, null, true);
378 
379 			try {
380 				while (stmts.hasNext()) {
381 					Statement nextStatement = stmts.next();
382 					// should be a safe cast...
383 					URI object = (URI) nextStatement.getObject();
384 					try {
385 						_scutter.addURL(new URL(object.toString()));
386 					} catch (MalformedURLException e) {
387 						// shouldn't really happen...
388 						_logger.warn("Bad URL: " + object.toString());
389 					}
390 				}
391 			} finally {
392 				if (stmts != null)
393 					stmts.close();
394 			}
395 		} catch (OpenRDFException e) {
396 			_logger.error(e.getMessage(), e);
397 		} catch (ClassCastException e) {
398 			_logger.error(e.getMessage(), e);
399 		} finally {
400 			if (con != null)
401 				con.close();
402 		}
403 
404 	}
405 
406 }