#!/usr/bin/env python3 import matplotlib.pyplot as plt import xml.etree.ElementTree as ET from datetime import date, time, datetime, timedelta from dataclasses import dataclass from glob import glob from typing import Sequence from functools import reduce, cache from bisect import bisect_right VOLICU_CELKEM = 8_245_962 OKRSKU_CELKEM = 14_857 START = datetime.fromisoformat('2023-01-14T14:00:00') kandidati_jmena = { 1: 'Pavel Fischer', 2: 'Jaroslav Bašta', 4: 'Petr Pavel', 5: 'Tomáš Zima', 6: 'Danuše Nerudová', 7: 'Andrej Babiš', 8: 'Karel Diviš', 9: 'Marek Hilšer', } @dataclass class VysledekOkrsku: poradi: int timestamp: datetime kandidati: dict[str, int] ucast: dict def dopln_vysledek(self): assert self.poradi != 0, "Dávka má mít kladné pořadí" self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY']) self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY']) # Ztracené (vydané a neodevzdané) obálky neřešíme # Nezapočítané hlasy neexistují na úrovni okrsku def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]: okrsky: dict[tuple[int, int], VysledekOkrsku] = {} with open(fn, 'r') as f: root = ET.parse(f).getroot() NS = {'': r'http://www.volby.cz/prezident/'} def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]: idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK']))) kandidati = {} for kandidat in elem.findall('./HLASY_OKRSEK', NS): cislo = int(kandidat.attrib['PORADOVE_CISLO']) hlasy = int(kandidat.attrib['HLASY']) jmeno = kandidati_jmena[cislo] kandidati[jmeno] = hlasy return VysledekOkrsku( poradi = int(elem.attrib['PORADI_ZPRAC']), timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']), ucast = elem.find('./UCAST_OKRSEK', NS).attrib, kandidati = kandidati, ), idOkrsku for okrsek in root.findall('./OKRSEK', NS): vysl, id = zpracuj_okrsek(okrsek) vysl.dopln_vysledek() okrsky[id] = vysl return okrsky @dataclass class KolacovaData: kandidati: dict[str, int] timestamp: datetime secteno_okrsku: int def zpracuj_davky(fns: Sequence[str]): # Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky # Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času # Uložíme pod timestampem do nějaké složky. # Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání # Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu) okrsky: dict[tuple[int, int], VysledekOkrsku] = {} celkovy_vysledek: dict[str, int] = {} data: list[KolacovaData] = [] for fn in fns: print(f'Zpracovávám dávku {fn}') nove = nacti_davku(fn) podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp) for okrsek, vysledek_okrsku in podle_casu: # Přidat okrsek do dat, přepočítat a exportovat výsledek if okrsek in okrsky: assert vysledek_okrsku.poradi > okrsky[okrsek].poradi # Odečteme předchozí výsledek tohohle okrsku for k in okrsky[okrsek].kandidati: celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k] okrsky[okrsek] = vysledek_okrsku for k in vysledek_okrsku.kandidati: if k not in celkovy_vysledek: celkovy_vysledek[k] = 0 celkovy_vysledek[k] += vysledek_okrsku.kandidati[k] # Počítání dat: ts = vysledek_okrsku.timestamp celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-) celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values()) # Vizualizace / export data.append(KolacovaData( kandidati = dict(celkovy_vysledek), timestamp = ts, secteno_okrsku = len(okrsky.keys()) )) return data # FIXME: ne globálním datům! data = [KolacovaData( kandidati={'NEZAPOČÍTANÉ': VOLICU_CELKEM}, timestamp=START, secteno_okrsku=0, )] data.extend(zpracuj_davky(sorted(glob('davky/*.xml')))) # Reálně ale kašleme na jednotlivé sekundy, takže z každé vezmeme jen poslední výsledek po_sekundach = {} for x in data: po_sekundach[x.timestamp] = x casy = sorted(po_sekundach.keys()) def frametime_to_realtime(t): # Až na úplný konec tak sčítání těchto voleb trvalo asi 3 hodiny. Řekněme, # že cílíme na cca 10minutové video, tedy zrychlení 18x. delta = timedelta(minutes=t.minute, hours=t.hour, seconds=t.second, microseconds=t.microsecond) delta *= 18 return START + delta def visualize_data(data: KolacovaData): # Tady se provede ta vizualizace, kterou děláme pořád dokolečka. # FIXME: zase jsem to zkopíroval :-P plt.clf() # Pro srovnání: zpracování okrsků # TODO: WTF: když tohle nakreslím až po kandidátech, tak se # kandidáti vyrenderují mimo bbox. okrsku_zbyva = OKRSKU_CELKEM - data.secteno_okrsku plt.pie( [data.secteno_okrsku, okrsku_zbyva], colors=['#000088', '#cccccc'], center=(1, -1), radius=0.3, ) # Zastoupení kandidátů order = ( 'Petr Pavel', 'Danuše Nerudová', 'Marek Hilšer', 'Pavel Fischer', 'Karel Diviš', 'Tomáš Zima', 'Jaroslav Bašta', 'Andrej Babiš', 'NEPLATNÉ', 'NEVOLILI', 'NEZAPOČÍTANÉ', ) # FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady. x = [data.kandidati.get(i, 0) for i in order] labels = order label = str(data.timestamp) colors = { 'Petr Pavel': '#627210', 'Danuše Nerudová': '#811367', 'Andrej Babiš': '#262161', 'Jaroslav Bašta': '#B51119', 'Marek Hilšer': '#CA834E', 'Pavel Fischer': '#244C76', 'Karel Diviš': '#3B6E5D', 'Tomáš Zima': '#E5DE1A', 'NEPLATNÉ': '#000000', 'NEVOLILI': '#666666', 'NEZAPOČÍTANÉ': '#CCCCCC', } plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%') plt.text(0, -1.2, label, ha='center') # save return plt.gcf() # Jen kvůli cachování výsledků… @cache def figure_podle_casu(ts: datetime): return visualize_data(po_sekundach[ts]) # https://zulko.github.io/moviepy/getting_started/working_with_matplotlib.html from moviepy.editor import VideoClip from moviepy.video.io.bindings import mplfig_to_npimage def make_frame(t): # wtf is t? ts = START + timedelta(seconds = t*18) idx = bisect_right(casy, ts) - 1 fig = figure_podle_casu(casy[idx]) return mplfig_to_npimage(fig) def make_anim(): anim = VideoClip(make_frame, duration=600) anim.write_videofile('progress.mkv', codec='hevc', fps=24) make_anim()