You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
152 lines
4.9 KiB
Python
152 lines
4.9 KiB
Python
2 years ago
|
#!/usr/bin/env python3
|
||
|
|
||
|
import matplotlib.pyplot as plt
|
||
|
import xml.etree.ElementTree as ET
|
||
|
from datetime import date, time, datetime
|
||
|
from dataclasses import dataclass
|
||
|
from glob import glob
|
||
|
from typing import Sequence
|
||
|
from functools import reduce
|
||
|
|
||
|
VOLICU_CELKEM = 8_245_962
|
||
|
OKRSKU_CELKEM = 14_857
|
||
|
kandidati_jmena = {
|
||
|
1: 'Pavel Fischer',
|
||
|
2: 'Jaroslav Bašta',
|
||
|
4: 'Petr Pavel',
|
||
|
5: 'Tomáš Zima',
|
||
|
6: 'Danuše Nerudová',
|
||
|
7: 'Andrej Babiš',
|
||
|
8: 'Karel Diviš',
|
||
|
9: 'Marek Hilšer',
|
||
|
}
|
||
|
|
||
|
@dataclass
|
||
|
class VysledekOkrsku:
|
||
|
poradi: int
|
||
|
timestamp: datetime
|
||
|
kandidati: dict[str, int]
|
||
|
ucast: dict
|
||
|
|
||
|
def dopln_vysledek(self):
|
||
|
assert self.poradi != 0, "Dávka má mít kladné pořadí"
|
||
|
self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY'])
|
||
|
self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY'])
|
||
|
# Ztracené (vydané a neodevzdané) obálky neřešíme
|
||
|
# Nezapočítané hlasy neexistují na úrovni okrsku
|
||
|
|
||
|
def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]:
|
||
|
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
|
||
|
|
||
|
with open(fn, 'r') as f:
|
||
|
root = ET.parse(f).getroot()
|
||
|
NS = {'': r'http://www.volby.cz/prezident/'}
|
||
|
|
||
|
def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]:
|
||
|
idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK'])))
|
||
|
kandidati = {}
|
||
|
for kandidat in elem.findall('./HLASY_OKRSEK', NS):
|
||
|
cislo = int(kandidat.attrib['PORADOVE_CISLO'])
|
||
|
hlasy = int(kandidat.attrib['HLASY'])
|
||
|
jmeno = kandidati_jmena[cislo]
|
||
|
kandidati[jmeno] = hlasy
|
||
|
return VysledekOkrsku(
|
||
|
poradi = int(elem.attrib['PORADI_ZPRAC']),
|
||
|
timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']),
|
||
|
ucast = elem.find('./UCAST_OKRSEK', NS).attrib,
|
||
|
kandidati = kandidati,
|
||
|
), idOkrsku
|
||
|
|
||
|
for okrsek in root.findall('./OKRSEK', NS):
|
||
|
vysl, id = zpracuj_okrsek(okrsek)
|
||
|
vysl.dopln_vysledek()
|
||
|
okrsky[id] = vysl
|
||
|
|
||
|
return okrsky
|
||
|
|
||
|
def zpracuj_davky(fns: Sequence[str]):
|
||
|
# Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky
|
||
|
# Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času
|
||
|
# Uložíme pod timestampem do nějaké složky.
|
||
|
# Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání
|
||
|
# Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu)
|
||
|
|
||
|
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
|
||
|
celkovy_vysledek: dict[str, int] = {}
|
||
|
for fn in fns:
|
||
|
print(f'Zpracovávám dávku {fn}')
|
||
|
nove = nacti_davku(fn)
|
||
|
podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp)
|
||
|
for okrsek, vysledek_okrsku in podle_casu:
|
||
|
# Přidat okrsek do dat, přepočítat a exportovat výsledek
|
||
|
if okrsek in okrsky:
|
||
|
assert vysledek_okrsku.poradi > okrsky[okrsek].poradi
|
||
|
# Odečteme předchozí výsledek tohohle okrsku
|
||
|
for k in okrsky[okrsek].kandidati:
|
||
|
celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k]
|
||
|
okrsky[okrsek] = vysledek_okrsku
|
||
|
for k in vysledek_okrsku.kandidati:
|
||
|
if k not in celkovy_vysledek: celkovy_vysledek[k] = 0
|
||
|
celkovy_vysledek[k] += vysledek_okrsku.kandidati[k]
|
||
|
# Počítání dat:
|
||
|
ts = vysledek_okrsku.timestamp
|
||
|
celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-)
|
||
|
celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values())
|
||
|
# Vizualizace / export
|
||
|
# FIXME: Zkopírováno, negenerické, fuj. Má používat nějaký dedikovaný spoolečný kód.
|
||
|
plt.clf()
|
||
|
# Pro srovnání: zpracování okrsků
|
||
|
# TODO: WTF: když tohle nakreslím až po kandidátech, tak se
|
||
|
# kandidáti vyrenderují mimo bbox.
|
||
|
okrsku_secteno = len(okrsky.keys())
|
||
|
okrsku_zbyva = OKRSKU_CELKEM - okrsku_secteno
|
||
|
plt.pie(
|
||
|
[okrsku_secteno, okrsku_zbyva],
|
||
|
colors=['#000088', '#cccccc'],
|
||
|
center=(1, -1), radius=0.3,
|
||
|
)
|
||
|
|
||
|
# Zastoupení kandidátů
|
||
|
order = (
|
||
|
'Petr Pavel',
|
||
|
'Danuše Nerudová',
|
||
|
'Marek Hilšer',
|
||
|
'Pavel Fischer',
|
||
|
'Karel Diviš',
|
||
|
'Tomáš Zima',
|
||
|
'Jaroslav Bašta',
|
||
|
'Andrej Babiš',
|
||
|
'NEPLATNÉ',
|
||
|
'NEVOLILI',
|
||
|
'NEZAPOČÍTANÉ',
|
||
|
)
|
||
|
# FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady.
|
||
|
x = [celkovy_vysledek.get(i, 0) for i in order]
|
||
|
print(ts, x)
|
||
|
labels = order
|
||
|
label = str(ts)
|
||
|
colors = {
|
||
|
'Petr Pavel': '#627210',
|
||
|
'Danuše Nerudová': '#811367',
|
||
|
'Andrej Babiš': '#262161',
|
||
|
'Jaroslav Bašta': '#B51119',
|
||
|
'Marek Hilšer': '#CA834E',
|
||
|
'Pavel Fischer': '#244C76',
|
||
|
'Karel Diviš': '#3B6E5D',
|
||
|
'Tomáš Zima': '#E5DE1A',
|
||
|
'NEPLATNÉ': '#000000',
|
||
|
'NEVOLILI': '#666666',
|
||
|
'NEZAPOČÍTANÉ': '#CCCCCC',
|
||
|
}
|
||
|
plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%')
|
||
|
plt.text(0, -1.2, label, ha='center')
|
||
|
|
||
|
# save
|
||
|
fn = f'progress_png/{ts}.png'
|
||
|
plt.savefig(fn)
|
||
|
|
||
|
|
||
|
zpracuj_davky(
|
||
|
sorted(glob('davky/*.xml'))
|
||
|
)
|