#!/usr/bin/env python3 from collections import Counter, defaultdict import csv from dataclasses import dataclass from datetime import datetime, timedelta from pathlib import Path import re from sys import argv from urllib.parse import urlparse from typing import Dict, List, Tuple from warnings import warn import user_agents ACCESS_RE = re.compile(' '.join(( r'(?P
\S+)', r'\S+', r'\S+', r'(?P\[\S+ \S+\])', r'"GET (?P[^ ?]+)(\?\S+)? [^"]+"', r'200 [0-9]+', r'"(?P[^"]+)(\?\S+)?"', r'"(?P[^"]+)"' ))) DATE_FMT = '[%d/%b/%Y:%H:%M:%S %z]' VISIT_MAX_DURATION = timedelta(hours=1) DOMAINS = { 'quatuorbellefeuille.com', 'quatuorbellefeuille.fr', 'klg.uber.space' } def normalize_path(p): if p == '/': return '/index.html' return p @dataclass class Access: address: str useragent: str referrer: str time: datetime resource: str @classmethod def from_log(cls, info): resource = normalize_path(info['resource']) referrer = urlparse(info['referer']) if referrer.netloc in DOMAINS: ref = normalize_path(referrer.path) else: ref = referrer.netloc return cls( info['address'], info['useragent'], ref, datetime.strptime(info['date'], DATE_FMT), resource ) def interesting(resource): return resource.endswith('.html') or resource == '/' def parse(logs_paths): logs = [] for lp in logs_paths: with open(lp) as logs_file: logs += logs_file.read().splitlines() matches = (ACCESS_RE.match(l) for l in logs) return tuple( Access.from_log(m) for m in matches if (m is not None and interesting(m['resource'])) ) Visit = List[Access] @dataclass class Visitor: address: str useragent: str referrers: List[str] visits: List[Visit] def useragent_kind(ua_string): ua = user_agents.parse(ua_string) if ua.is_pc: return 'pc' if ua.is_mobile: return 'mobile' if ua.is_tablet: return 'tablet' if ua.is_bot: return 'bot' warn(f'Unknown user agent kind: {ua_string}') return 'n/a' def sort_visits(accesses): visitors: Dict[Tuple(str, str), Visitor] = {} for a in accesses: key = (a.address, a.useragent) visitor = visitors.get(key) if visitor is None: visitor = Visitor( a.address, useragent_kind(a.useragent), a.referrer, [[a]] ) visitors[key] = visitor continue last_visit = visitor.visits[-1] last_access = last_visit[-1].time if a.time - last_access < VISIT_MAX_DURATION: last_visit.append(a) continue visitor.visits.append([a]) return visitors def find_days(visits): return { v[0].time.replace(hour=0, minute=0, second=0) for v in visits } def daily_visitors(visitors, output_path): days: Dict[datetime, Counter] = defaultdict(Counter) columns = ('mobile', 'tablet', 'pc', 'bot', 'n/a') print('Visitors:') for v in visitors.values(): for day in find_days(v.visits): days[day][v.useragent] += 1 with open(output_path, 'w') as f: out = csv.writer(f) out.writerow(('day', 'total', *columns)) print('day', 'total', *columns, sep='\t') for day in sorted(days): counter = days[day] counters = tuple(counter[c] for c in columns) values = (day.strftime('%F'), sum(counters), *counters) out.writerow(values) print(*values, sep='\t') def daily_stats(visitors, output_dir): output_dir = Path(output_dir) daily_visitors(visitors, output_dir.joinpath('dailyvisitors.csv')) # daily_visits(visitors, output_dir.joinpath('dailyvisits.csv')) # daily_pages_per_visit(visitors, output_dir.joinpath('dailypagespervisit.csv')) # daily_page_hits(visitors, output_dir.joinpath('dailypagehits.csv')) # daily_referrers(visitors, output_dir.joinpath('dailyreferrers.csv')) def global_stats(visitors, output_dir): pass def main(logs_paths, output_dir): accesses = parse(logs_paths) visitors = sort_visits(accesses) daily_stats(visitors, output_dir) global_stats(visitors, output_dir) if __name__ == '__main__': main(argv[1:-1], argv[-1])