You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

182 lines
5.8 KiB
Python

#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2023 LEdoian <volby@pokemon.ledoian.cz>
# SPDX-License-Identifier: GPL-2.0-only
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET
from datetime import date, time, datetime
from dataclasses import dataclass
from glob import glob
from typing import Sequence
from functools import reduce
from pathlib import Path
VOLICU_CELKEM = 8_245_962
OKRSKU_CELKEM = 14_857
START = datetime.fromisoformat('2023-01-14T14:00:00')
kandidati_jmena = {
1: 'Pavel Fischer',
2: 'Jaroslav Bašta',
4: 'Petr Pavel',
5: 'Tomáš Zima',
6: 'Danuše Nerudová',
7: 'Andrej Babiš',
8: 'Karel Diviš',
9: 'Marek Hilšer',
}
@dataclass
class VysledekOkrsku:
poradi: int
timestamp: datetime
kandidati: dict[str, int]
ucast: dict
def dopln_vysledek(self):
assert self.poradi != 0, "Dávka má mít kladné pořadí"
self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY'])
self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY'])
# Ztracené (vydané a neodevzdané) obálky neřešíme
# Nezapočítané hlasy neexistují na úrovni okrsku
def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]:
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
with open(fn, 'r') as f:
root = ET.parse(f).getroot()
NS = {'': r'http://www.volby.cz/prezident/'}
def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]:
idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK'])))
kandidati = {}
for kandidat in elem.findall('./HLASY_OKRSEK', NS):
cislo = int(kandidat.attrib['PORADOVE_CISLO'])
hlasy = int(kandidat.attrib['HLASY'])
jmeno = kandidati_jmena[cislo]
kandidati[jmeno] = hlasy
return VysledekOkrsku(
poradi = int(elem.attrib['PORADI_ZPRAC']),
timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']),
ucast = elem.find('./UCAST_OKRSEK', NS).attrib,
kandidati = kandidati,
), idOkrsku
for okrsek in root.findall('./OKRSEK', NS):
vysl, id = zpracuj_okrsek(okrsek)
vysl.dopln_vysledek()
okrsky[id] = vysl
return okrsky
@dataclass
class KolacovaData:
kandidati: dict[str, int]
timestamp: datetime
secteno_okrsku: int
def zpracuj_davky(fns: Sequence[str]):
# Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky
# Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času
# Uložíme pod timestampem do nějaké složky.
# Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání
# Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu)
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
celkovy_vysledek: dict[str, int] = {}
data: list[KolacovaData] = []
for fn in fns:
print(f'Zpracovávám dávku {fn}')
nove = nacti_davku(fn)
podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp)
for okrsek, vysledek_okrsku in podle_casu:
# Přidat okrsek do dat, přepočítat a exportovat výsledek
if okrsek in okrsky:
assert vysledek_okrsku.poradi > okrsky[okrsek].poradi
# Odečteme předchozí výsledek tohohle okrsku
for k in okrsky[okrsek].kandidati:
celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k]
okrsky[okrsek] = vysledek_okrsku
for k in vysledek_okrsku.kandidati:
if k not in celkovy_vysledek: celkovy_vysledek[k] = 0
celkovy_vysledek[k] += vysledek_okrsku.kandidati[k]
# Počítání dat:
ts = vysledek_okrsku.timestamp
celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-)
celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values())
# Vizualizace / export
data.append(KolacovaData(
kandidati = dict(celkovy_vysledek),
timestamp = ts,
secteno_okrsku = len(okrsky.keys())
))
return data
# FIXME: ne globálním datům!
data = [KolacovaData(
kandidati={'NEZAPOČÍTANÉ': VOLICU_CELKEM},
timestamp=START,
secteno_okrsku=0,
)]
data.extend(zpracuj_davky(sorted(glob('davky/*.xml'))))
# Reálně ale kašleme na jednotlivé sekundy, takže z každé vezmeme jen poslední výsledek
po_sekundach = {}
for x in data:
po_sekundach[x.timestamp] = x
def visualize_data(data: KolacovaData):
# FIXME: Zkopírováno, negenerické, fuj. Má používat nějaký dedikovaný spoolečný kód.
plt.clf()
# Pro srovnání: zpracování okrsků
# TODO: WTF: když tohle nakreslím až po kandidátech, tak se
# kandidáti vyrenderují mimo bbox.
okrsku_zbyva = OKRSKU_CELKEM - data.secteno_okrsku
plt.pie(
[data.secteno_okrsku, okrsku_zbyva],
colors=['#000088', '#cccccc'],
center=(1, -1), radius=0.3,
)
# Zastoupení kandidátů
order = (
'Petr Pavel',
'Danuše Nerudová',
'Marek Hilšer',
'Pavel Fischer',
'Karel Diviš',
'Tomáš Zima',
'Jaroslav Bašta',
'Andrej Babiš',
'NEPLATNÉ',
'NEVOLILI',
'NEZAPOČÍTANÉ',
)
# FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady.
x = [data.kandidati.get(i, 0) for i in order]
print(data.timestamp, x)
labels = order
label = str(data.timestamp)
colors = {
'Petr Pavel': '#627210',
'Danuše Nerudová': '#811367',
'Andrej Babiš': '#262161',
'Jaroslav Bašta': '#B51119',
'Marek Hilšer': '#CA834E',
'Pavel Fischer': '#244C76',
'Karel Diviš': '#3B6E5D',
'Tomáš Zima': '#E5DE1A',
'NEPLATNÉ': '#000000',
'NEVOLILI': '#666666',
'NEZAPOČÍTANÉ': '#CCCCCC',
}
plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%')
plt.text(0, -1.2, label, ha='center')
# save
fn = f'progress_png/{data.timestamp}.png'
plt.savefig(fn)
Path('./progress_png').mkdir(exist_ok=True)
for kolac in po_sekundach.values():
visualize_data(kolac)