View Javadoc

1   /*
2    * Copyright (c) 2007, Peter Mika All rights reserved.
3    * 
4    * Redistribution and use in source and binary forms, with or without
5    * modification, are permitted provided that the following conditions are met:
6    * 
7    * - Redistributions of source code must retain the above copyright notice, this
8    *   list of conditions and the following disclaimer.
9    * - Redistributions in binary form must reproduce the above copyright notice,
10   *   this list of conditions and the following disclaimer in the documentation
11   *   and/or other materials provided with the distribution. 
12   * - Neither the name of the openrdf.org nor the names of its contributors may
13   *   be used to endorse or promote products derived from this software without
14   *   specific prior written permission.
15   * 
16   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17   * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18   * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19   * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20   * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21   * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22   * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23   * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24   * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25   * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26   * POSSIBILITY OF SUCH DAMAGE.
27   * 
28   */
29  package org.openrdf.elmo.scutter;
30  
31  import java.io.BufferedReader;
32  import java.io.BufferedWriter;
33  import java.io.File;
34  import java.io.FileReader;
35  import java.io.FileWriter;
36  import java.io.IOException;
37  import java.io.PrintWriter;
38  import java.net.MalformedURLException;
39  import java.net.URL;
40  import java.util.ArrayList;
41  import java.util.HashSet;
42  import java.util.Hashtable;
43  import java.util.Iterator;
44  import java.util.List;
45  import java.util.Set;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Executors;
48  import java.util.concurrent.ThreadPoolExecutor;
49  import java.util.concurrent.TimeUnit;
50  import java.util.regex.Pattern;
51  
52  import org.openrdf.elmo.scutter.Util;
53  import org.openrdf.model.Resource;
54  import org.openrdf.model.Value;
55  import org.openrdf.query.BindingSet;
56  import org.openrdf.query.MalformedQueryException;
57  import org.openrdf.query.QueryEvaluationException;
58  import org.openrdf.query.TupleQueryResult;
59  import org.openrdf.query.QueryLanguage;
60  
61  import org.openrdf.repository.Repository;
62  import org.openrdf.repository.RepositoryConnection;
63  import org.openrdf.repository.RepositoryException;
64  import org.openrdf.repository.RepositoryResult;
65  import org.slf4j.Logger;
66  import org.slf4j.LoggerFactory;
67  
68  /**
69   * Scutter is the main class of the RDF crawler (scutter). It can be invoked
70   * from the command line or built into applications. Scutter implements Runnable
71   * so that it can be run as a background thread.
72   * 
73   * @author Peter Mika (original version for Jena by Matt Biddulph.)
74   * @version $Revision: 1.20 $
75   */
76  
77  public class Scutter implements Runnable {
78  
79  	List<URL> _urls = new ArrayList<URL>(); // the queue of URLs to be visited
80  
81  	HashSet<URL> _visited = new HashSet<URL>(); // the URLs visited so far
82  
83  	private RetrieverFactory _factory = null; // the repository where the
84  
85  	// collected data should be
86  	// added
87  
88  	private boolean _stop = false; // flag to indicate that the crawler should
89  
90  	// be stopped
91  
92  	private final static String VISITED_QUERY = "SELECT DISTINCT url from {x} socionet:provenance {url} "
93  			+ "using namespace "
94  			+ "socionet = <http://www.cs.vu.nl/~pmika/socionet#>";
95  
96  	/**
97  	 * @see EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue
98  	 */
99  	public final static int LINKEDQUEUE_SIZE = 1000; // see
100 
101 	/**
102 	 * @see EDU.oswego.cs.dl.util.concurrent.PooledExecutor
103 	 */
104 	public final static int DEFAULT_MAXTHREADS = 20; // see
105 
106 	/**
107 	 * Time to wait for Scutter to finish after stopped (in sec). before saving
108 	 * queue to disk. Time to wait is dependent on the size of the thread pool.
109 	 */
110 	public final static int STOP_TIME = 1 * 60;
111 
112 	/**
113 	 * Threshold for the number of URLs seen from a domain before it becomes
114 	 * blacklisted
115 	 */
116 	private final static int BLACKLIST_THRESHOLD = 500; // if there are more
117 
118 	/**
119 	 * Stores how many URLs we have visited so far from a domain
120 	 * 
121 	 */
122 	private Hashtable<String, Integer> _domains = new Hashtable<String, Integer>();
123 
124 	private final Logger _logger = LoggerFactory.getLogger(Scutter.class);
125 
126 	/**
127 	 * Defines the whitelist (domains that are allowed even when on blacklist)
128 	 */
129 	private Pattern _domainPattern = null;
130 
131 	/**
132 	 * Defines whether we store metadata with statements
133 	 */
134 	private boolean _metadata = false; //
135 
136 	/**
137 	 * Enables automatic blacklisting
138 	 */
139 	private boolean _autoBlackList = true;
140 
141 	/**
142 	 * Size limit (in KB) for files to be parsed. Files larger will not be
143 	 * loaded.
144 	 */
145 	public final static int DEFAULT_SIZE_LIMIT = 1000; // in kilobytes
146 
147 	/**
148 	 * @see EDU.oswego.cs.dl.util.concurrent.PooledExecutor
149 	 */
150 	private int _sizelimit = DEFAULT_SIZE_LIMIT;
151 
152 	/**
153 	 * @see EDU.oswego.cs.dl.util.concurrent.PooledExecutor
154 	 */
155 	private int _maxThreads = DEFAULT_MAXTHREADS;
156 
157 	private File _queueFile;
158 
159 	private File _blacklistFile;
160 	
161 	private Resource[] _context;
162 
163 	/**
164 	 * Scutter command-line tool.
165 	 * 
166 	 * @param args
167 	 *            First argument is the URL of Sesame server, second argument is
168 	 *            the repository name. Remaining arguments are interpreted as
169 	 *            URLs and are used to initialize the queue.
170 	 */
171 	public final static void main(String[] args) throws Exception {
172 		// Log4j configuration
173 
174 		// BasicConfigurator.configure();
175 		// Logger.getRootLogger().setLevel(Level.INFO);
176 
177 		Scutter s = null;
178 		try {
179 			if (args.length < 3) {
180 				throw new Exception("Usage: java -jar elmo-scutter.jar <server> <repository> <starturl1> <starturl2> ...");
181 			}
182 			String[] urls = new String[args.length - 2];
183 			for (int i = 2; i < args.length; i++) {
184 				urls[i - 2] = args[i];
185 			}
186 			Repository repository = Util.initRepository(args[0], args[1]);
187 
188 			// Create factory for simple retrievers
189 			RetrieverFactory factory = new RetrieverFactory(repository, false);
190 			s = new Scutter(factory);
191 			s.initQueue(urls);
192 		} catch (Exception e) {
193 			e.printStackTrace();
194 			System.exit(1);
195 		}
196 		s.run();
197 	}
198 
199 	/**
200 	 * Add a list of URLs to the queue
201 	 * 
202 	 * @param urls
203 	 *            Array of strings (URLs)
204 	 */
205 
206 	public void initQueue(String[] urls) {
207 		for (int i = 0; i < urls.length; i++) {
208 			try {
209 				System.err.println("considering: " + urls[i]);
210 				if (urls[i] != null) {
211 					addURL(new URL(urls[i].trim()));
212 				}
213 			} catch (MalformedURLException e) {
214 				_logger.error(e.toString());
215 			}
216 		}
217 	}
218 
219 	/**
220 	 * Get the queue. Beware: may mutate while the scutter is running.
221 	 * 
222 	 * @return List of URL objects
223 	 */
224 	public List getQueue() {
225 		return _urls;
226 	}
227 
228 	/**
229 	 * Get the set of URLs visited so far. Beware: may mutate while the scutter
230 	 * is running.
231 	 * 
232 	 * @return Set of URL objects
233 	 */
234 	public Set getVisited() {
235 		return _visited;
236 	}
237 
238 	/**
239 	 * Clear the queue and visited lists.
240 	 * 
241 	 * @return false if clear fails
242 	 */
243 	public boolean clear() {
244 		try {
245 			_urls.clear();
246 			_visited.clear();
247 			_domains.clear();
248 			_logger.info("Queue and visited lists are cleared");
249 			return true;
250 		} catch (Exception e) {
251 			_logger.error(e.toString());
252 			return false;
253 		}
254 	}
255 
256 	protected void addVisited(URL url) {
257 		_visited.add(url);
258 
259 		String host = Scutter.getHost(url);
260 		synchronized (_domains) {
261 			if (_domains.containsKey(host)) {
262 				int currentCount = ((Integer) _domains.get(host)).intValue();
263 				_domains.put(host, new Integer(currentCount + 1));
264 			} else {
265 				_domains.put(host, new Integer(1));
266 			}
267 		}
268 	}
269 
270 	/**
271 	 * Load the URLs of sources visited so far from the repository.
272 	 * 
273 	 * @throws RepositoryException
274 	 * @throws RepositoryException
275 	 * @throws QueryEvaluationException
276 	 * @throws QueryEvaluationException
277 	 * @throws MalformedQueryException
278 	 * 
279 	 */
280 	protected void loadVisited() {
281 		//Visited URLs are simply the set of the contexts in which statements are placed
282 		try {
283 			RepositoryConnection con = _factory.getTarget().getConnection();
284 			RepositoryResult<Resource> results = null; 
285 			 
286 			try {
287 				results = con.getContextIDs(); 
288 			   
289 				while (results.hasNext()) {
290 					Resource nextContext = results.next();
291 					try {
292 						addVisited(new URL(nextContext.toString()));
293 					} catch (MalformedURLException mue) {
294 						// ignore
295 					}
296 					_logger.info("Visited URLs reloaded.");
297 				}
298 			} finally {
299 				if (results != null) results.close();
300 				con.close();
301 			}
302 		} catch (Exception e) {
303 			_logger.error("Reloading visited URLs failed");
304 			_logger.error(e.getMessage(), e);
305 		}
306 	}
307 
308 	/**
309 	 * Add URLs to the queue from a file on the disk. Format is one URL per
310 	 * line.
311 	 * 
312 	 * @return number of URLs loaded
313 	 */
314 	public int loadQueue() {
315 		int result = 0;
316 
317 		BufferedReader in = null;
318 		try {
319 			in = new BufferedReader(new FileReader(_queueFile));
320 			String currentLine = in.readLine();
321 
322 			while (currentLine != null) {
323 				try {
324 					addURL(new URL(currentLine.trim()));
325 					result++;
326 				} catch (MalformedURLException mue) {
327 					// error in the queue file
328 				}
329 
330 				currentLine = in.readLine();
331 			}
332 
333 			_logger.info("Queue reloaded from file.");
334 		} catch (IOException ioe) {
335 			_logger.error("Loading queue failed.");
336 		} finally {
337 			if (in != null)
338 				try {
339 					in.close();
340 				} catch (IOException e) {
341 				}
342 		}
343 
344 		return result;
345 	}
346 
347 	/**
348 	 * Save the status of the queue to the disk. Format is one URL per line.
349 	 * 
350 	 */
351 	public void saveQueue() {
352 		PrintWriter out = null;
353 		try {
354 			if (_queueFile != null) {
355 				_queueFile.delete();
356 				// returns false if the file already exists
357 				_queueFile.createNewFile();
358 			}
359 			out = new PrintWriter(
360 					new BufferedWriter(new FileWriter(_queueFile)));
361 			synchronized (_urls) {
362 				for (int i = 0; i < _urls.size(); i++) {
363 					out.println(_urls.get(i));
364 				}
365 			}
366 
367 			out.flush();
368 
369 			_logger.info("Queue saved to file.");
370 		} catch (IOException ioe) {
371 			_logger
372 					.error("Creating or writing queue file failed, check servlet configuration.");
373 			// temp file is null
374 			_queueFile = null;
375 		} finally {
376 			if (out != null)
377 				out.close();
378 		}
379 	}
380 
381 	/**
382 	 * Add prefixes to the blacklist from a file on the disk. Format is one
383 	 * prefix per line.
384 	 * 
385 	 * @return number of URLs loaded
386 	 */
387 	public int loadBlacklist() {
388 		int result = 0;
389 
390 		BufferedReader in = null;
391 		try {
392 			in = new BufferedReader(new FileReader(_blacklistFile));
393 			String currentLine = in.readLine();
394 
395 			while (currentLine != null) {
396 				synchronized (_domains) {
397 					_domains.put(currentLine.trim(), new Integer(
398 							BLACKLIST_THRESHOLD + 1));
399 				}
400 				result++;
401 				currentLine = in.readLine();
402 			}
403 
404 			_logger.info("Blacklist reloaded from file.");
405 		} catch (IOException ioe) {
406 			_logger.error("Loading blacklist failed.");
407 		} finally {
408 			if (in != null)
409 				try {
410 					in.close();
411 				} catch (IOException e) {
412 
413 				}
414 		}
415 
416 		return result;
417 	}
418 
419 	/**
420 	 * Save the status of the blacklist to the disk. Format is one prefix per
421 	 * line.
422 	 * 
423 	 * @throws IOException
424 	 */
425 	public void saveBlacklist() throws IOException {
426 		PrintWriter out = null;
427 		try {
428 			if (_blacklistFile != null) {
429 				_blacklistFile.delete();
430 				// returns false if the file already exists
431 				_blacklistFile.createNewFile();
432 			}
433 			out = new PrintWriter(new BufferedWriter(new FileWriter(
434 					_blacklistFile)));
435 			Iterator it = _domains.keySet().iterator();
436 			while (it.hasNext()) {
437 				String host = (String) it.next();
438 				if (((Integer) _domains.get(host)).intValue() > BLACKLIST_THRESHOLD) {
439 					out.println(host);
440 				}
441 			}
442 
443 			out.flush();
444 
445 			_logger.info("Blacklist saved to file.");
446 		} catch (IOException ioe) {
447 			_logger
448 					.error("Creating or writing blacklist file failed, check servlet configuration.");
449 			// temp file is null
450 			_blacklistFile = null;
451 		} finally {
452 			if (out != null)
453 				out.close();
454 		}
455 	}
456 
457 	/**
458 	 * Stop the scutter. The thread will sleep for a specified time to allow the
459 	 * scutter to finish.
460 	 * 
461 	 * @throws IOException
462 	 */
463 	public void stop() throws IOException {
464 		_stop = true;
465 		// wait a bit for it to finish
466 		try {
467 			Thread.sleep(STOP_TIME);
468 		} catch (InterruptedException ie) {
469 		}
470 		if (_queueFile != null)
471 			saveQueue();
472 		if (_blacklistFile != null)
473 			saveBlacklist();
474 
475 		_logger.info("Scutter stopped.");
476 	}
477 
478 	/**
479 	 * Sets a pattern (regular expression) for limiting crawling to those URLs
480 	 * that match this pattern. In other words, this pattern defines the
481 	 * whitelist.
482 	 * 
483 	 * @param p
484 	 *            Pattern to use
485 	 */
486 	public void setDomainPattern(Pattern p) {
487 		if (p != null) {
488 			_domainPattern = p;
489 		}
490 	}
491 
492 	/**
493 	 * Return the pattern used for the whitelist.
494 	 * 
495 	 * @return Pattern
496 	 */
497 	public Pattern getDomainPattern() {
498 		return _domainPattern;
499 	}
500 
501 	/**
502 	 * Set whether the scutter should produce metadata.
503 	 * 
504 	 * @param metadata
505 	 */
506 	public void setStoreMetadata(boolean metadata) {
507 		_metadata = metadata;
508 	}
509 
510 	/**
511 	 * Determine whether the scutter is set to produce metadata.
512 	 * 
513 	 * @return Flag indicating if the metadata feature is turned on
514 	 */
515 	public boolean getStoreMetadata() {
516 		return _metadata;
517 	}
518 
519 	/**
520 	 * Set whether the scutter should automatically put sites on blacklist after
521 	 * a number of profiles has been collected from that site.
522 	 * 
523 	 * @param metadata
524 	 */
525 	public void setAutoBlackList(boolean blacklist) {
526 		_autoBlackList = blacklist;
527 	}
528 
529 	/**
530 	 * Determine whether automatic blacklisting is enabled.
531 	 * 
532 	 * @return Flag indicating if the metadata feature is turned on
533 	 */
534 	public boolean getAutoBlackList() {
535 		return _autoBlackList;
536 	}
537 
538 	/**
539 	 * Set the size limit for files to be loaded
540 	 * 
541 	 * @param sizelimit
542 	 */
543 	public void setSizeLimit(int sizelimit) {
544 		_sizelimit = sizelimit;
545 	}
546 
547 	/**
548 	 * 
549 	 * 
550 	 * @return Size limit for files to be loaded
551 	 */
552 	public int getSizeLimit() {
553 		return _sizelimit;
554 	}
555 
556 	/**
557 	 * @return Returns the maxThreads.
558 	 */
559 	public int getMaxThreads() {
560 		return _maxThreads;
561 	}
562 
563 	/**
564 	 * @param threads
565 	 *            The maxThreads to set.
566 	 */
567 	public void setMaxThreads(int threads) {
568 		_maxThreads = threads;
569 	}
570 
571 	/**
572 	 * Create a new scutter.
573 	 * 
574 	 * @param repository
575 	 *            Sesame repository to be used for storing the data
576 	 * 
577 	 * @throws Exception
578 	 */
579 	public Scutter(RetrieverFactory factory) throws Exception {
580 		_factory = factory;
581 
582 		loadVisited();
583 	}
584 
585 	private void shutdownAndAwaitTermination(ExecutorService pool) {
586 		pool.shutdown(); // Disable new tasks from being submitted
587 		try {
588 			// Wait a while for existing tasks to terminate
589 			if (!pool.awaitTermination(STOP_TIME, TimeUnit.SECONDS)) {
590 				pool.shutdownNow(); // Cancel currently executing tasks
591 				// Wait a while for tasks to respond to being cancelled
592 				if (!pool.awaitTermination(STOP_TIME, TimeUnit.SECONDS))
593 					System.err.println("Pool did not terminate");
594 			}
595 		} catch (InterruptedException ie) {
596 			// (Re-)Cancel if current thread also interrupted
597 			pool.shutdownNow();
598 			// Preserve interrupt status
599 			Thread.currentThread().interrupt();
600 		}
601 	}
602 
603 	/**
604 	 * Main method that loops infinitely or until the scutter is stopped.
605 	 * 
606 	 */
607 	public void run() {
608 		_logger.info("Scutter started.");
609 
610 		if (_queueFile != null)
611 			loadQueue();
612 		if (_blacklistFile != null)
613 			loadBlacklist();
614 
615 		_stop = false;
616 
617 		ExecutorService executor = Executors.newFixedThreadPool(_maxThreads);
618 
619 		while (!_stop) {
620 
621 			if (_urls.size() > 0) {
622 
623 				Retriever r = null;
624 				URL url = null;
625 				synchronized (_urls) {
626 					// Need to instantiate new object in case this thread
627 					// modifies the value of url
628 					try {
629 						url = new URL(((URL) _urls.remove(0)).toString());
630 					} catch (MalformedURLException mue) {
631 						_logger.warn("Malformed URL: " + url.toString());
632 					}
633 					/*
634 					 * //Check Content-Length try { if
635 					 * (Util.getContentLength(url.toString()) > _sizelimit *
636 					 * 1000) { _logger.info("File is too large: " + url);
637 					 * continue; } } catch (HttpException e) {
638 					 * _logger.warn("Checking content length failed for " +
639 					 * url); } catch (IOException ioe) { _logger.warn("Checking
640 					 * content length failed for " + url); } catch
641 					 * (RuntimeException rte) { _logger.warn("Checking content
642 					 * length failed for " + url); }
643 					 */
644 
645 				}
646 				_logger.info("Putting " + url
647 						+ " into the execute queue, queue size now "
648 						+ _urls.size());
649 
650 				r = _factory.getRetriever(url, this);
651 
652 				executor.execute(r);
653 				_logger.debug("Pool size is "
654 						+ ((ThreadPoolExecutor) executor).getPoolSize()
655 						+ ", queue size is "
656 						+ ((ThreadPoolExecutor) executor).getQueue().size());
657 
658 			}
659 		}
660 
661 		shutdownAndAwaitTermination(executor);
662 		_logger.info("Scutter task exiting.");
663 	}
664 
665 	/**
666 	 * Add a single URL to the queue.
667 	 * 
668 	 * @param url
669 	 */
670 	public void addURL(URL url) {
671 		synchronized (_urls) {
672 			if (_visited.contains(url)) {
673 				_logger.debug("Already visited : " + url);
674 			} else if (inBlacklist(url)) {
675 				_logger.info("Blacklisted domain : " + url);
676 			} else if (_domainPattern != null
677 					&& !_domainPattern.matcher(url.toString()).matches()
678 					&& !url.getHost().equalsIgnoreCase("localhost")) {
679 				_logger.info("Not on whitelist : " + url);
680 			} else {
681 				_urls.add(url);
682 				addVisited(url);
683 			}
684 		}
685 	}
686 
687 	/**
688 	 * Check if a URL is on the blacklist.
689 	 * 
690 	 * @param url
691 	 * @return
692 	 */
693 	protected boolean inBlacklist(URL url) {
694 
695 		String host = Scutter.getHost(url);
696 		if (_autoBlackList
697 				&& _domains.containsKey(host)
698 				&& ((Integer) _domains.get(host)).intValue() > BLACKLIST_THRESHOLD) {
699 			return true;
700 		} else {
701 			return false;
702 		}
703 
704 	}
705 
706 	private static String getHost(URL url) {
707 		String host = url.getHost();
708 		/*
709 		 * Doesn't work for addresses such as example.co.uk //Get upper two
710 		 * levels of domain name String[] parts = host.split("\\."); if
711 		 * (parts.length > 1) { return parts[parts.length-2] + '.' +
712 		 * parts[parts.length-1]; } else { return host; }
713 		 */
714 		return host;
715 
716 	}
717 
718 	/**
719 	 * @return Returns the blacklistFile.
720 	 */
721 	public File getBlacklistFile() {
722 		return _blacklistFile;
723 	}
724 
725 	/**
726 	 * @param blacklistFile
727 	 *            The blacklistFile to set.
728 	 */
729 	public void setBlacklistFile(File blacklistFile) {
730 		_blacklistFile = blacklistFile;
731 	}
732 
733 	/**
734 	 * @return Returns the queueFile.
735 	 */
736 	public File getQueueFile() {
737 		return _queueFile;
738 	}
739 
740 	/**
741 	 * @param queueFile
742 	 *            The queueFile to set.
743 	 */
744 	public void setQueueFile(File queueFile) {
745 		_queueFile = queueFile;
746 	}
747 
748 	public void setContext(Resource[] context) {
749 		// TODO Auto-generated method stub
750 		_context = context;
751 	}
752 	public Resource[] getContext() {
753 		// TODO Auto-generated method stub
754 		return _context;
755 	}
756 	
757 	
758 }