importastimportdatetimeimportosimportrefromcollectionsimportdefaultdictfromlogparserimportparsefromlogparser.commonimportDATETIME_PATTERN,Common# Kingfisher Collect logs an INFO message starting with "Spider arguments:".SPIDER_ARGUMENTS_SEARCH_STRING=" INFO: Spider arguments: "MAXIMUM_TIMEDELTA=3# Hotfix: https://github.com/my8100/logparser/pull/19Common.SIGTERM_PATTERN=re.compile(r"^%s[ ].+?:[ ](Received[ ]SIG(?:BREAK|INT|TERM)([ ]twice)?),"%DATETIME_PATTERN)# noqa: UP031
[docs]classScrapyLogFile:"""A representation of a Scrapy log file."""
[docs]@classmethoddeffind(cls,logs_directory,source_id,data_version):""" Find and return the first matching log file for the given crawl. :param str logs_directory: Kingfisher Collect's project directory within Scrapyd's logs_dir directory :param str source_id: the spider's name :param datetime.datetime data_version: the crawl directory's name, parsed as a datetime """source_directory=os.path.join(logs_directory,source_id)ifos.path.isdir(source_directory):withos.scandir(source_directory)asit:forentryinit:ifentry.name.endswith(".log"):scrapy_log_file=ScrapyLogFile(entry.path)ifscrapy_log_file.match(data_version):returnscrapy_log_filereturnNone
def__init__(self,name:str,text:str="")->None:""" :param name: the full path to the log file :param text: the text content of the log file """self.name=nameself.text=textself._logparser=Noneself._item_counts=Noneself._spider_arguments=None
[docs]defdelete(self):"""Delete the log file and any log summary ending in ``.stats``."""ifself.name:ifos.path.isfile(self.name):os.remove(self.name)summary=f"{self.name}.stats"ifos.path.isfile(summary):os.remove(summary)
# Logparser processing@propertydeflogparser(self)->dict:"""Return the output of `logparser <https://pypi.org/project/logparser/>`__."""ifself._logparserisNone:# `taillines=0` sets the 'tail' key to all lines, so we set it to 1.self._logparser=parse(self.read(),headlines=0,taillines=1)returnself._logparser
[docs]defmatch(self,data_version)->bool:""" Return whether the crawl directory's name, parsed as a datetime, is less than 3 seconds after the log file's start time. """return0<=data_version.timestamp()-self.crawl_time.timestamp()<MAXIMUM_TIMEDELTA
@propertydefcrawl_time(self)->datetime.datetime:""" Return the ``crawl_time`` spider argument if set, or the ``start_time`` crawl statistic otherwise. If neither is logged, return the time of the first log message. """crawl_time=self.spider_arguments.get("crawl_time")ifcrawl_time:returndatetime.datetime.strptime(crawl_time,"%Y-%m-%dT%H:%M:%S")if"start_time"inself.logparser["crawler_stats"]:returneval(self.logparser["crawler_stats"]["start_time"]).replace(microsecond=0)# noqa: S307returndatetime.datetime.fromtimestamp(self.logparser["first_log_timestamp"])
[docs]defis_finished(self)->bool:""" Return whether the log file contains a "Spider closed (finished)" log message or a ``finish_reason`` crawl statistic set to "finished". """# See https://kingfisher-collect.readthedocs.io/en/latest/logs.html#check-the-reason-for-closing-the-spider# logparser's `finish_reason` is "N/A" for an unclean shutdown, because crawl statistics aren't logged.returnself.logparser["finish_reason"]=="finished"
# Line-by-line processing@propertydefitem_counts(self)->dict:"""Return the number of each type of item, according to the log file."""ifself._item_countsisNone:self._process_line_by_line()returnself._item_counts@propertydefspider_arguments(self)->dict:"""Return the spider's arguments."""ifself._spider_argumentsisNone:self._process_line_by_line()returnself._spider_arguments
[docs]defis_complete(self)->bool:"""Return whether the crawl collected a subset of the dataset, according to the log file."""# See https://kingfisher-collect.readthedocs.io/en/latest/spiders.html#spider-argumentsreturnnotany(self.spider_arguments.get(arg)forargin("from_date","until_date","portal","publisher","system","sample","path","qs:",))
[docs]defread(self):"""Return the text content of the log file."""ifself.text:returnself.textwithopen(self.name)asf:returnf.read()
def__iter__(self):"""Yield each line of the log file."""ifself.text:forlineinself.text.splitlines(keepends=True):yieldlineelse:withopen(self.name)asf:forlineinf:yieldlinedef_process_line_by_line(self)->None:self._item_counts=defaultdict(int)self._spider_arguments={}buffer=[]forlineinself:ifbufferorline.startswith("{"):buffer.append(line.rstrip())ifbufferandbuffer[-1].endswith("}"):try:# Kingfisher Collect's LogFormatter logs scraped items as dicts that use only simple types,# so `ast.literal_eval` is safe.item=ast.literal_eval("".join(buffer))if"number"initem:self._item_counts["FileItem"]+=1elif"data_type"initem:self._item_counts["File"]+=1exceptValueError:# Scrapy dumps stats as a dict, which uses `datetime.datetime` types that can't be parsed with# `ast.literal_eval`.passbuffer=[]index=line.find(SPIDER_ARGUMENTS_SEARCH_STRING)ifindex>-1:# `eval` is used, because the string can contain `datetime.date` and is written by trusted code in# Kingfisher Collect. Otherwise, we can modify the string so that `ast.literal_eval` can be used.self._spider_arguments=eval(line[index+len(SPIDER_ARGUMENTS_SEARCH_STRING):])# noqa: S307# Mixed processing@propertydeferror_rate(self)->float:""" Return an estimated lower bound of the true error rate. Kingfisher Collect is expected to yield at most one ERROR message per request leading to a File item, so the true error rate can only be less than this estimated lower bound if Kingfisher Collect breaks this expectation. On the other hand, the true error rate can easily be higher than the estimated lower bound; for example: - If the spider crawls 10 URLs, each returning 99 URLs, each returning OCDS data, and the requests for 5 of the 10 fail, then the estimated lower bound is 5 / 500 (1%), though the true error rate is 50%. - Similarly if the spider crawls 10 archive files, each containing 99 OCDS files, or 10 JSON files each containing 99 release packages. """# Kingfisher Collect logs retrieval errors as ERROR messages.error_count=self.logparser["log_categories"]["error_logs"]["count"]+self.logparser["crawler_stats"].get("invalid_json_count",0)try:returnerror_count/(self.item_counts["File"]+self.item_counts["FileItem"]+error_count)exceptZeroDivisionError:return1