Vizualizace průběhu sčítání

master
LEdoian 2 years ago
parent 5d2d940791
commit 54a0fdb65b

@ -0,0 +1,151 @@
#!/usr/bin/env python3
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET
from datetime import date, time, datetime
from dataclasses import dataclass
from glob import glob
from typing import Sequence
from functools import reduce
VOLICU_CELKEM = 8_245_962
OKRSKU_CELKEM = 14_857
kandidati_jmena = {
1: 'Pavel Fischer',
2: 'Jaroslav Bašta',
4: 'Petr Pavel',
5: 'Tomáš Zima',
6: 'Danuše Nerudová',
7: 'Andrej Babiš',
8: 'Karel Diviš',
9: 'Marek Hilšer',
}
@dataclass
class VysledekOkrsku:
poradi: int
timestamp: datetime
kandidati: dict[str, int]
ucast: dict
def dopln_vysledek(self):
assert self.poradi != 0, "Dávka má mít kladné pořadí"
self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY'])
self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY'])
# Ztracené (vydané a neodevzdané) obálky neřešíme
# Nezapočítané hlasy neexistují na úrovni okrsku
def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]:
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
with open(fn, 'r') as f:
root = ET.parse(f).getroot()
NS = {'': r'http://www.volby.cz/prezident/'}
def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]:
idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK'])))
kandidati = {}
for kandidat in elem.findall('./HLASY_OKRSEK', NS):
cislo = int(kandidat.attrib['PORADOVE_CISLO'])
hlasy = int(kandidat.attrib['HLASY'])
jmeno = kandidati_jmena[cislo]
kandidati[jmeno] = hlasy
return VysledekOkrsku(
poradi = int(elem.attrib['PORADI_ZPRAC']),
timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']),
ucast = elem.find('./UCAST_OKRSEK', NS).attrib,
kandidati = kandidati,
), idOkrsku
for okrsek in root.findall('./OKRSEK', NS):
vysl, id = zpracuj_okrsek(okrsek)
vysl.dopln_vysledek()
okrsky[id] = vysl
return okrsky
def zpracuj_davky(fns: Sequence[str]):
# Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky
# Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času
# Uložíme pod timestampem do nějaké složky.
# Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání
# Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu)
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
celkovy_vysledek: dict[str, int] = {}
for fn in fns:
print(f'Zpracovávám dávku {fn}')
nove = nacti_davku(fn)
podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp)
for okrsek, vysledek_okrsku in podle_casu:
# Přidat okrsek do dat, přepočítat a exportovat výsledek
if okrsek in okrsky:
assert vysledek_okrsku.poradi > okrsky[okrsek].poradi
# Odečteme předchozí výsledek tohohle okrsku
for k in okrsky[okrsek].kandidati:
celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k]
okrsky[okrsek] = vysledek_okrsku
for k in vysledek_okrsku.kandidati:
if k not in celkovy_vysledek: celkovy_vysledek[k] = 0
celkovy_vysledek[k] += vysledek_okrsku.kandidati[k]
# Počítání dat:
ts = vysledek_okrsku.timestamp
celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-)
celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values())
# Vizualizace / export
# FIXME: Zkopírováno, negenerické, fuj. Má používat nějaký dedikovaný spoolečný kód.
plt.clf()
# Pro srovnání: zpracování okrsků
# TODO: WTF: když tohle nakreslím až po kandidátech, tak se
# kandidáti vyrenderují mimo bbox.
okrsku_secteno = len(okrsky.keys())
okrsku_zbyva = OKRSKU_CELKEM - okrsku_secteno
plt.pie(
[okrsku_secteno, okrsku_zbyva],
colors=['#000088', '#cccccc'],
center=(1, -1), radius=0.3,
)
# Zastoupení kandidátů
order = (
'Petr Pavel',
'Danuše Nerudová',
'Marek Hilšer',
'Pavel Fischer',
'Karel Diviš',
'Tomáš Zima',
'Jaroslav Bašta',
'Andrej Babiš',
'NEPLATNÉ',
'NEVOLILI',
'NEZAPOČÍTANÉ',
)
# FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady.
x = [celkovy_vysledek.get(i, 0) for i in order]
print(ts, x)
labels = order
label = str(ts)
colors = {
'Petr Pavel': '#627210',
'Danuše Nerudová': '#811367',
'Andrej Babiš': '#262161',
'Jaroslav Bašta': '#B51119',
'Marek Hilšer': '#CA834E',
'Pavel Fischer': '#244C76',
'Karel Diviš': '#3B6E5D',
'Tomáš Zima': '#E5DE1A',
'NEPLATNÉ': '#000000',
'NEVOLILI': '#666666',
'NEZAPOČÍTANÉ': '#CCCCCC',
}
plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%')
plt.text(0, -1.2, label, ha='center')
# save
fn = f'progress_png/{ts}.png'
plt.savefig(fn)
zpracuj_davky(
sorted(glob('davky/*.xml'))
)

@ -0,0 +1,205 @@
#!/usr/bin/env python3
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET
from datetime import date, time, datetime, timedelta
from dataclasses import dataclass
from glob import glob
from typing import Sequence
from functools import reduce
VOLICU_CELKEM = 8_245_962
OKRSKU_CELKEM = 14_857
START = datetime.fromisoformat('2023-01-14T14:00:00')
kandidati_jmena = {
1: 'Pavel Fischer',
2: 'Jaroslav Bašta',
4: 'Petr Pavel',
5: 'Tomáš Zima',
6: 'Danuše Nerudová',
7: 'Andrej Babiš',
8: 'Karel Diviš',
9: 'Marek Hilšer',
}
@dataclass
class VysledekOkrsku:
poradi: int
timestamp: datetime
kandidati: dict[str, int]
ucast: dict
def dopln_vysledek(self):
assert self.poradi != 0, "Dávka má mít kladné pořadí"
self.kandidati['NEPLATNÉ'] = int(self.ucast['ODEVZDANE_OBALKY']) - int(self.ucast['PLATNE_HLASY'])
self.kandidati['NEVOLILI'] = int(self.ucast['ZAPSANI_VOLICI']) - int(self.ucast['ODEVZDANE_OBALKY'])
# Ztracené (vydané a neodevzdané) obálky neřešíme
# Nezapočítané hlasy neexistují na úrovni okrsku
def nacti_davku(fn) -> dict[tuple[int, int], VysledekOkrsku]:
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
with open(fn, 'r') as f:
root = ET.parse(f).getroot()
NS = {'': r'http://www.volby.cz/prezident/'}
def zpracuj_okrsek(elem) -> tuple[VysledekOkrsku, tuple[int, int]]:
idOkrsku = tuple(map(int, (elem.get(x) for x in ['CIS_OBEC', 'CIS_OKRSEK'])))
kandidati = {}
for kandidat in elem.findall('./HLASY_OKRSEK', NS):
cislo = int(kandidat.attrib['PORADOVE_CISLO'])
hlasy = int(kandidat.attrib['HLASY'])
jmeno = kandidati_jmena[cislo]
kandidati[jmeno] = hlasy
return VysledekOkrsku(
poradi = int(elem.attrib['PORADI_ZPRAC']),
timestamp = datetime.fromisoformat(elem.attrib['DATUM_CAS_ZPRAC']),
ucast = elem.find('./UCAST_OKRSEK', NS).attrib,
kandidati = kandidati,
), idOkrsku
for okrsek in root.findall('./OKRSEK', NS):
vysl, id = zpracuj_okrsek(okrsek)
vysl.dopln_vysledek()
okrsky[id] = vysl
return okrsky
@dataclass
class KolacovaData:
kandidati: dict[str, int]
timestamp: datetime
secteno_okrsku: int
def zpracuj_davky(fns: Sequence[str]):
# Zpracovávací fáze: načteme dávku, nahradíme předchozí instance pro okrsky
# Počítací fáze: sečteme hlasy za všechny okrsky, vyrobíme dump nebo koláč ke každému času
# Uložíme pod timestampem do nějaké složky.
# Tyto dvě fáze ale musí probíhat zároveň, pokud chceme mít vykreslený graf za každý okamžik předání
# Jiný program: vezme dumpy/koláče a naskládá je správně do videa (případně provede jinou analýzu)
okrsky: dict[tuple[int, int], VysledekOkrsku] = {}
celkovy_vysledek: dict[str, int] = {}
data: list[KolacovaData] = []
for fn in fns:
print(f'Zpracovávám dávku {fn}')
nove = nacti_davku(fn)
podle_casu = sorted(nove.items(), key=lambda x: x[1].timestamp)
for okrsek, vysledek_okrsku in podle_casu:
# Přidat okrsek do dat, přepočítat a exportovat výsledek
if okrsek in okrsky:
assert vysledek_okrsku.poradi > okrsky[okrsek].poradi
# Odečteme předchozí výsledek tohohle okrsku
for k in okrsky[okrsek].kandidati:
celkovy_vysledek[k] -= okrsky[okrsek].kandidati[k]
okrsky[okrsek] = vysledek_okrsku
for k in vysledek_okrsku.kandidati:
if k not in celkovy_vysledek: celkovy_vysledek[k] = 0
celkovy_vysledek[k] += vysledek_okrsku.kandidati[k]
# Počítání dat:
ts = vysledek_okrsku.timestamp
celkovy_vysledek['NEZAPOČÍTANÉ'] = 0 # del který funguje i v první iteraci :-)
celkovy_vysledek['NEZAPOČÍTANÉ'] = VOLICU_CELKEM - sum(celkovy_vysledek.values())
# Vizualizace / export
data.append(KolacovaData(
kandidati = dict(celkovy_vysledek),
timestamp = ts,
secteno_okrsku = len(okrsky.keys())
))
return data
# FIXME: ne globálním datům!
data = [KolacovaData(
kandidati={'NEZAPOČÍTANÉ': VOLICU_CELKEM},
timestamp=START,
secteno_okrsku=0,
)]
data.extend(zpracuj_davky(sorted(glob('davky/*.xml'))))
# Reálně ale kašleme na jednotlivé sekundy, takže z každé vezmeme jen poslední výsledek
po_sekundach = {}
for x in data:
po_sekundach[x.timestamp] = x
casy = sorted(po_sekundach.keys())
def frametime_to_realtime(t):
# Až na úplný konec tak sčítání těchto voleb trvalo asi 3 hodiny. Řekněme,
# že cílíme na cca 10minutové video, tedy zrychlení 18x.
delta = timedelta(minutes=t.minute, hours=t.hour, seconds=t.second, microseconds=t.microsecond)
delta *= 18
return START + delta
from bisect import bisect_right
def data_for_timestamp(ts) -> KolacovaData:
idx = bisect_right(casy, ts)
return po_sekundach[casy[idx-1]] # bisect_right vrací konzistentně hodnotu o jedna větší, než potřebujeme.
def visualize_data(data):
# Tady se provede ta vizualizace, kterou děláme pořád dokolečka.
# FIXME: zase jsem to zkopíroval :-P
plt.clf()
# Pro srovnání: zpracování okrsků
# TODO: WTF: když tohle nakreslím až po kandidátech, tak se
# kandidáti vyrenderují mimo bbox.
okrsku_zbyva = OKRSKU_CELKEM - data.secteno_okrsku
plt.pie(
[data.secteno_okrsku, okrsku_zbyva],
colors=['#000088', '#cccccc'],
center=(1, -1), radius=0.3,
)
# Zastoupení kandidátů
order = (
'Petr Pavel',
'Danuše Nerudová',
'Marek Hilšer',
'Pavel Fischer',
'Karel Diviš',
'Tomáš Zima',
'Jaroslav Bašta',
'Andrej Babiš',
'NEPLATNÉ',
'NEVOLILI',
'NEZAPOČÍTANÉ',
)
# FIXME: průběžné výsledky nemají některé kandidáty. Tohle bychom neměli opravovat tady.
x = [data.kandidati.get(i, 0) for i in order]
labels = order
label = str(data.timestamp)
colors = {
'Petr Pavel': '#627210',
'Danuše Nerudová': '#811367',
'Andrej Babiš': '#262161',
'Jaroslav Bašta': '#B51119',
'Marek Hilšer': '#CA834E',
'Pavel Fischer': '#244C76',
'Karel Diviš': '#3B6E5D',
'Tomáš Zima': '#E5DE1A',
'NEPLATNÉ': '#000000',
'NEVOLILI': '#666666',
'NEZAPOČÍTANÉ': '#CCCCCC',
}
plt.pie(x, labels=labels, colors=[colors[i] for i in order], autopct='%1.3f %%')
plt.text(0, -1.2, label, ha='center')
# save
return plt.gcf()
# https://zulko.github.io/moviepy/getting_started/working_with_matplotlib.html
from moviepy.editor import VideoClip
from moviepy.video.io.bindings import mplfig_to_npimage
def make_frame(t):
# wtf is t?
data: KolacovaData
#data = data_for_timestamp(frametime_to_realtime(t))
data = data_for_timestamp(START + timedelta(seconds=t)*18)
fig = visualize_data(data)
return mplfig_to_npimage(fig)
def make_anim():
anim = VideoClip(make_frame, duration=600)
anim.write_videofile('progress.mkv', codec='hevc', fps=24)
make_anim()
Loading…
Cancel
Save