diff --git a/progress.py b/progress.py new file mode 100755 index 0000000..311bdc1 --- /dev/null +++ b/progress.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 + +import matplotlib.pyplot as plt +import xml.etree.ElementTree as ET +from datetime import date, time, datetime +from dataclasses import dataclass +from glob import glob +from typing import Sequence +from functools import reduce + +VOLICU_CELKEM = 8_245_962 +OKRSKU_CELKEM = 14_857 +kandidati_jmena = { + 1: 'Pavel Fischer', + 2: 'Jaroslav Bašta', + 4: 'Petr Pavel', + 5: 'Tomáš Zima', + 6: 'Danuše Nerudová', + 7: 'Andrej Babiš', + 8: 'Karel Diviš', + 9: 'Marek Hilšer', + } + +@dataclass +class VysledekOkrsku: + poradi: int + timestamp: datetime + kandidati: dict[str, int] + ucast: dict + + def dopln_vysledek(self): + assert self.poradi != 0, "Dávka má mít kladné pořadí" + self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY']) + self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY']) + # Ztracené (vydané a neodevzdané) obálky neřešíme + # Nezapočítané hlasy neexistují na úrovni okrsku + +def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]: + okrsky: dict[tuple[int, int], VysledekOkrsku] = {} + + with open(fn, 'r') as f: + root = ET.parse(f).getroot() + NS = {'': r'http://www.volby.cz/prezident/'} + + def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]: + idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK']))) + kandidati = {} + for kandidat in elem.findall('./HLASY_OKRSEK', NS): + cislo = int(kandidat.attrib['PORADOVE_CISLO']) + hlasy = int(kandidat.attrib['HLASY']) + jmeno = kandidati_jmena[cislo] + kandidati[jmeno] = hlasy + return VysledekOkrsku( + poradi = int(elem.attrib['PORADI_ZPRAC']), + timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']), + ucast = elem.find('./UCAST_OKRSEK', NS).attrib, + kandidati = kandidati, + ), idOkrsku + + for okrsek in root.findall('./OKRSEK', NS): + vysl, id = zpracuj_okrsek(okrsek) + vysl.dopln_vysledek() + okrsky[id] = vysl + + return okrsky + +def zpracuj_davky(fns: Sequence[str]): + # Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky + # Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času + # Uložíme pod timestampem do nějaké složky. + # Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání + # Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu) + + okrsky: dict[tuple[int, int], VysledekOkrsku] = {} + celkovy_vysledek: dict[str, int] = {} + for fn in fns: + print(f'Zpracovávám dávku {fn}') + nove = nacti_davku(fn) + podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp) + for okrsek, vysledek_okrsku in podle_casu: + # Přidat okrsek do dat, přepočítat a exportovat výsledek + if okrsek in okrsky: + assert vysledek_okrsku.poradi > okrsky[okrsek].poradi + # Odečteme předchozí výsledek tohohle okrsku + for k in okrsky[okrsek].kandidati: + celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k] + okrsky[okrsek] = vysledek_okrsku + for k in vysledek_okrsku.kandidati: + if k not in celkovy_vysledek: celkovy_vysledek[k] = 0 + celkovy_vysledek[k] += vysledek_okrsku.kandidati[k] + # Počítání dat: + ts = vysledek_okrsku.timestamp + celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-) + celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values()) + # Vizualizace / export + # FIXME: Zkopírováno, negenerické, fuj. Má používat nějaký dedikovaný spoolečný kód. + plt.clf() + # Pro srovnání: zpracování okrsků + # TODO: WTF: když tohle nakreslím až po kandidátech, tak se + # kandidáti vyrenderují mimo bbox. + okrsku_secteno = len(okrsky.keys()) + okrsku_zbyva = OKRSKU_CELKEM - okrsku_secteno + plt.pie( + [okrsku_secteno, okrsku_zbyva], + colors=['#000088', '#cccccc'], + center=(1, -1), radius=0.3, + ) + + # Zastoupení kandidátů + order = ( + 'Petr Pavel', + 'Danuše Nerudová', + 'Marek Hilšer', + 'Pavel Fischer', + 'Karel Diviš', + 'Tomáš Zima', + 'Jaroslav Bašta', + 'Andrej Babiš', + 'NEPLATNÉ', + 'NEVOLILI', + 'NEZAPOČÍTANÉ', + ) + # FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady. + x = [celkovy_vysledek.get(i, 0) for i in order] + print(ts, x) + labels = order + label = str(ts) + colors = { + 'Petr Pavel': '#627210', + 'Danuše Nerudová': '#811367', + 'Andrej Babiš': '#262161', + 'Jaroslav Bašta': '#B51119', + 'Marek Hilšer': '#CA834E', + 'Pavel Fischer': '#244C76', + 'Karel Diviš': '#3B6E5D', + 'Tomáš Zima': '#E5DE1A', + 'NEPLATNÉ': '#000000', + 'NEVOLILI': '#666666', + 'NEZAPOČÍTANÉ': '#CCCCCC', + } + plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%') + plt.text(0, -1.2, label, ha='center') + + # save + fn = f'progress_png/{ts}.png' + plt.savefig(fn) + + +zpracuj_davky( + sorted(glob('davky/*.xml')) + ) diff --git a/progress_vid.py b/progress_vid.py new file mode 100755 index 0000000..1605f86 --- /dev/null +++ b/progress_vid.py @@ -0,0 +1,205 @@ +#!/usr/bin/env python3 + +import matplotlib.pyplot as plt +import xml.etree.ElementTree as ET +from datetime import date, time, datetime, timedelta +from dataclasses import dataclass +from glob import glob +from typing import Sequence +from functools import reduce + +VOLICU_CELKEM = 8_245_962 +OKRSKU_CELKEM = 14_857 +START = datetime.fromisoformat('2023-01-14T14:00:00') +kandidati_jmena = { + 1: 'Pavel Fischer', + 2: 'Jaroslav Bašta', + 4: 'Petr Pavel', + 5: 'Tomáš Zima', + 6: 'Danuše Nerudová', + 7: 'Andrej Babiš', + 8: 'Karel Diviš', + 9: 'Marek Hilšer', + } + +@dataclass +class VysledekOkrsku: + poradi: int + timestamp: datetime + kandidati: dict[str, int] + ucast: dict + + def dopln_vysledek(self): + assert self.poradi != 0, "Dávka má mít kladné pořadí" + self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY']) + self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY']) + # Ztracené (vydané a neodevzdané) obálky neřešíme + # Nezapočítané hlasy neexistují na úrovni okrsku + +def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]: + okrsky: dict[tuple[int, int], VysledekOkrsku] = {} + + with open(fn, 'r') as f: + root = ET.parse(f).getroot() + NS = {'': r'http://www.volby.cz/prezident/'} + + def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]: + idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK']))) + kandidati = {} + for kandidat in elem.findall('./HLASY_OKRSEK', NS): + cislo = int(kandidat.attrib['PORADOVE_CISLO']) + hlasy = int(kandidat.attrib['HLASY']) + jmeno = kandidati_jmena[cislo] + kandidati[jmeno] = hlasy + return VysledekOkrsku( + poradi = int(elem.attrib['PORADI_ZPRAC']), + timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']), + ucast = elem.find('./UCAST_OKRSEK', NS).attrib, + kandidati = kandidati, + ), idOkrsku + + for okrsek in root.findall('./OKRSEK', NS): + vysl, id = zpracuj_okrsek(okrsek) + vysl.dopln_vysledek() + okrsky[id] = vysl + + return okrsky + +@dataclass +class KolacovaData: + kandidati: dict[str, int] + timestamp: datetime + secteno_okrsku: int + +def zpracuj_davky(fns: Sequence[str]): + # Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky + # Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času + # Uložíme pod timestampem do nějaké složky. + # Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání + # Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu) + + okrsky: dict[tuple[int, int], VysledekOkrsku] = {} + celkovy_vysledek: dict[str, int] = {} + data: list[KolacovaData] = [] + for fn in fns: + print(f'Zpracovávám dávku {fn}') + nove = nacti_davku(fn) + podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp) + for okrsek, vysledek_okrsku in podle_casu: + # Přidat okrsek do dat, přepočítat a exportovat výsledek + if okrsek in okrsky: + assert vysledek_okrsku.poradi > okrsky[okrsek].poradi + # Odečteme předchozí výsledek tohohle okrsku + for k in okrsky[okrsek].kandidati: + celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k] + okrsky[okrsek] = vysledek_okrsku + for k in vysledek_okrsku.kandidati: + if k not in celkovy_vysledek: celkovy_vysledek[k] = 0 + celkovy_vysledek[k] += vysledek_okrsku.kandidati[k] + # Počítání dat: + ts = vysledek_okrsku.timestamp + celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-) + celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values()) + # Vizualizace / export + data.append(KolacovaData( + kandidati = dict(celkovy_vysledek), + timestamp = ts, + secteno_okrsku = len(okrsky.keys()) + )) + return data + +# FIXME: ne globálním datům! +data = [KolacovaData( + kandidati={'NEZAPOČÍTANÉ': VOLICU_CELKEM}, + timestamp=START, + secteno_okrsku=0, + )] +data.extend(zpracuj_davky(sorted(glob('davky/*.xml')))) + +# Reálně ale kašleme na jednotlivé sekundy, takže z každé vezmeme jen poslední výsledek +po_sekundach = {} +for x in data: + po_sekundach[x.timestamp] = x +casy = sorted(po_sekundach.keys()) + +def frametime_to_realtime(t): + # Až na úplný konec tak sčítání těchto voleb trvalo asi 3 hodiny. Řekněme, + # že cílíme na cca 10minutové video, tedy zrychlení 18x. + delta = timedelta(minutes=t.minute, hours=t.hour, seconds=t.second, microseconds=t.microsecond) + delta *= 18 + return START + delta + +from bisect import bisect_right +def data_for_timestamp(ts) -> KolacovaData: + idx = bisect_right(casy, ts) + return po_sekundach[casy[idx-1]] # bisect_right vrací konzistentně hodnotu o jedna větší, než potřebujeme. + + +def visualize_data(data): + # Tady se provede ta vizualizace, kterou děláme pořád dokolečka. + # FIXME: zase jsem to zkopíroval :-P + plt.clf() + # Pro srovnání: zpracování okrsků + # TODO: WTF: když tohle nakreslím až po kandidátech, tak se + # kandidáti vyrenderují mimo bbox. + okrsku_zbyva = OKRSKU_CELKEM - data.secteno_okrsku + plt.pie( + [data.secteno_okrsku, okrsku_zbyva], + colors=['#000088', '#cccccc'], + center=(1, -1), radius=0.3, + ) + + # Zastoupení kandidátů + order = ( + 'Petr Pavel', + 'Danuše Nerudová', + 'Marek Hilšer', + 'Pavel Fischer', + 'Karel Diviš', + 'Tomáš Zima', + 'Jaroslav Bašta', + 'Andrej Babiš', + 'NEPLATNÉ', + 'NEVOLILI', + 'NEZAPOČÍTANÉ', + ) + # FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady. + x = [data.kandidati.get(i, 0) for i in order] + labels = order + label = str(data.timestamp) + colors = { + 'Petr Pavel': '#627210', + 'Danuše Nerudová': '#811367', + 'Andrej Babiš': '#262161', + 'Jaroslav Bašta': '#B51119', + 'Marek Hilšer': '#CA834E', + 'Pavel Fischer': '#244C76', + 'Karel Diviš': '#3B6E5D', + 'Tomáš Zima': '#E5DE1A', + 'NEPLATNÉ': '#000000', + 'NEVOLILI': '#666666', + 'NEZAPOČÍTANÉ': '#CCCCCC', + } + plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%') + plt.text(0, -1.2, label, ha='center') + + # save + return plt.gcf() + +# https://zulko.github.io/moviepy/getting_started/working_with_matplotlib.html +from moviepy.editor import VideoClip +from moviepy.video.io.bindings import mplfig_to_npimage + +def make_frame(t): + # wtf is t? + data: KolacovaData + #data = data_for_timestamp(frametime_to_realtime(t)) + data = data_for_timestamp(START + timedelta(seconds=t)*18) + fig = visualize_data(data) + return mplfig_to_npimage(fig) + +def make_anim(): + anim = VideoClip(make_frame, duration=600) + anim.write_videofile('progress.mkv', codec='hevc', fps=24) + +make_anim()