Skip to content

Commit

Permalink
Backport #513 (importing with timezones)
Browse files Browse the repository at this point in the history
  • Loading branch information
geier committed Oct 10, 2016
1 parent a66c91d commit fdf40e3
Show file tree
Hide file tree
Showing 6 changed files with 402 additions and 68 deletions.
83 changes: 75 additions & 8 deletions khal/aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

from .compat import to_unicode

from datetime import date, datetime, timedelta
from datetime import time as dtime
from datetime import date, datetime, timedelta
from collections import defaultdict
import random
import string

Expand Down Expand Up @@ -393,19 +394,85 @@ def new_event(dtstart=None, dtend=None, summary=None, timezone=None,
return event


def ics_from_list(vevent, random_uid=False):
"""convert an iterable of icalendar.Event to an icalendar.Calendar
def split_ics(ics, random_uid=False):
"""split an ics string into several according to VEVENT's UIDs
and sort the right VTIMEZONEs accordingly
ignores all other ics components
:type ics: str
:param random_uid: assign random uids to all events
:type random_uid: bool
:rtype list:
"""
cal = icalendar.Calendar.from_ical(ics)
tzs = {item['TZID']: item for item in cal.walk() if item.name == 'VTIMEZONE'}

events_grouped = defaultdict(list)
for item in cal.walk():
if item.name == 'VEVENT':
events_grouped[item['UID']].append(item)
else:
continue
return [ics_from_list(events, tzs, random_uid) for uid, events in
sorted(events_grouped.items())]


def ics_from_list(events, tzs, random_uid=False):
"""convert an iterable of icalendar.Events to an icalendar.Calendar
:param random_uid: asign the same random UID to all events
:params events: list of events all with the same uid
:type events: list(icalendar.cal.Event)
:param random_uid: assign random uids to all events
:type random_uid: bool
:param tzs: collection of timezones
:type tzs: dict(icalendar.cal.Vtimzone
"""
calendar = icalendar.Calendar()
calendar.add('version', '2.0')
calendar.add('prodid', '-//CALENDARSERVER.ORG//NONSGML Version 1//EN')

if random_uid:
new_uid = icalendar.vText(generate_random_uid())
for sub_event in vevent:
new_uid = generate_random_uid()

needed_tz, missing_tz = set(), set()
for sub_event in events:
if random_uid:
sub_event['uid'] = new_uid
sub_event['UID'] = new_uid
# icalendar round-trip converts `TZID=a b` to `TZID="a b"` investigate, file bug XXX
for prop in ['DTSTART', 'DTEND', 'DUE', 'EXDATE', 'RDATE', 'RECURRENCE-ID', 'DUE']:
if isinstance(sub_event.get(prop), list):
items = sub_event.get(prop)
else:
items = [sub_event.get(prop)]

for item in items:
if not (hasattr(item, 'dt') or hasattr(item, 'dts')):
continue
# if prop is a list, all items have the same parameters
datetime_ = item.dts[0].dt if hasattr(item, 'dts') else item.dt

if not hasattr(datetime_, 'tzinfo'):
continue

# check for datetimes' timezones which are not understood by
# icalendar
if datetime_.tzinfo is None and 'TZID' in item.params and \
item.params['TZID'] not in missing_tz:
logger.warn(
'Cannot find timezone `{}` in .ics file, using default timezone. '
'This can lead to erroneous time shifts'.format(item.params['TZID'])
)
missing_tz.add(item.params['TZID'])
elif datetime_.tzinfo != pytz.UTC:
needed_tz.add(datetime_.tzinfo)

for tzid in needed_tz:
if str(tzid) in tzs:
calendar.add_component(tzs[str(tzid)])
else:
logger.warn(
'Cannot find timezone `{}` in .ics file, this could be a bug, '
'please report this issue at http://github.com/pimutils/khal/.'.format(tzid))
for sub_event in events:
calendar.add_component(sub_event)
return calendar
return calendar.to_ical().decode('utf-8')
69 changes: 29 additions & 40 deletions khal/controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
from click import confirm, echo, style, prompt
from vdirsyncer.utils.vobject import Item

from collections import defaultdict

import datetime
import itertools
import logging
Expand All @@ -38,7 +36,6 @@
from khal.khalendar.exceptions import ReadOnlyCalendarError, DuplicateUid
from khal.exceptions import InvalidDate, FatalError
from khal.khalendar.event import Event
from khal.khalendar.backend import sort_key
from khal import __version__, __productname__
from khal.log import logger
from .terminal import colored, get_terminal_size, merge_columns
Expand Down Expand Up @@ -219,63 +216,55 @@ def import_ics(collection, conf, ics, batch=False, random_uid=False):
even when an event with the same uid already exists
:type batch: bool
"""
cal = icalendar.Calendar.from_ical(ics)
events = [item for item in cal.walk() if item.name == 'VEVENT']
events_grouped = defaultdict(list)
for event in events:
events_grouped[event['UID']].append(event)

vevents = list()
for uid in events_grouped:
vevents.append(sorted(events_grouped[uid], key=sort_key))
vevents = aux.split_ics(ics, random_uid)
for vevent in vevents:
import_event(vevent, collection, conf['locale'], batch, random_uid)
import_event(vevent, collection, conf['locale'], batch)


def import_event(vevent, collection, locale, batch, random_uid):
"""import one event into collection, let user choose the collection"""
def import_event(vevent, collection, locale, batch, format=None, env=None):
"""import one event into collection, let user choose the collection
:type vevent: list of vevents, which can be more than one VEVENT, i.e., the
same UID, i.e., one "master" event and (optionally) 1+ RECURRENCE-ID events
:type vevent: list(str)
"""
# print all sub-events
for sub_event in vevent:
if not batch:
event = Event.fromVEvents(
[sub_event], calendar=collection.default_calendar_name, locale=locale)
echo(event.event_description)
if not batch:
for item in icalendar.Calendar.from_ical(vevent).walk():
if item.name == 'VEVENT':
event = Event.fromVEvents(
[item], calendar=collection.default_calendar_name, locale=locale)
echo(event.event_description)

# get the calendar to insert into
if batch or len(collection.writable_names) == 1:
calendar_name = collection.writable_names[0]
else:
choice = list()
for num, name in enumerate(collection.writable_names):
choice.append(u'{}({})'.format(name, num))
choice = u', '.join(choice)
calendar_names = sorted(collection.writable_names)
choices = ', '.join(
['{}({})'.format(name, num) for num, name in enumerate(calendar_names)])
while True:
value = prompt(u'Which calendar do you want to import to? \n'
u'{}'.format(choice), default=collection.default_calendar_name)
value = prompt(
u"Which calendar do you want to import to? (unique prefixes are fine)\n"
u"{}".format(choices),
default=collection.default_calendar_name,
)
try:
number = int(value)
calendar_name = collection.writable_names[number]
calendar_name = calendar_names[int(value)]
break
except (ValueError, IndexError):
matches = filter(lambda x: x.startswith(value), collection.writable_names)
matches = [x for x in collection.writable_names if x.startswith(value)]
if len(matches) == 1:
calendar_name = matches[0]
break
echo(u'invalid choice')

if batch or confirm(u"Do you want to import this event into `{}`?"
u"".format(calendar_name)):
ics = aux.ics_from_list(vevent, random_uid)
if batch or confirm(u"Do you want to import this event into `{}`?".format(calendar_name)):
try:
collection.new(
Item(ics.to_ical().decode('utf-8')),
collection=calendar_name)
collection.new(Item(vevent), collection=calendar_name)
except DuplicateUid:
if batch or confirm(u"An event with the same UID already exists. "
u"Do you want to update it?"):
collection.force_update(
Item(ics.to_ical().decode('utf-8')),
collection=calendar_name)
if batch or confirm(
u"An event with the same UID already exists. Do you want to update it?"):
collection.force_update(Item(vevent), collection=calendar_name)
else:
logger.warn(u"Not importing event with UID `{}`".format(event.uid))
68 changes: 48 additions & 20 deletions tests/aux_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""testing functions from the khal.aux"""
from datetime import date, datetime, time, timedelta
import textwrap
import random

import icalendar
import pytz
Expand All @@ -10,8 +11,7 @@
from khal import aux
from khal.compat import to_bytes

from .aux import _get_all_vevents_file, _get_text, \
normalize_component
from .aux import _get_text, normalize_component


def _now():
Expand Down Expand Up @@ -49,6 +49,11 @@ def _replace_uid(event):
return event


def _get_TZIDs(lines):
"""from a list of strings, get all unique strings that start with TZID"""
return sorted((line for line in lines if line.startswith('TZID')))


def test_normalize_component():
assert normalize_component(textwrap.dedent("""
BEGIN:VEVENT
Expand Down Expand Up @@ -342,21 +347,44 @@ def test_description_and_location():
assert _replace_uid(event).to_ical() == vevent


class TestIcsFromList(object):

def test_ics_from_list(self):
vevents = _get_all_vevents_file('event_rrule_recuid')
cal = aux.ics_from_list(list(vevents))
assert normalize_component(cal.to_ical()) == \
normalize_component(_get_text('event_rrule_recuid'))

def test_ics_from_list_random_uid(self):
vevents = _get_all_vevents_file('event_rrule_recuid')
cal = aux.ics_from_list(list(vevents), random_uid=True)
normalize_component(cal.to_ical())
vevents = [item for item in cal.walk() if item.name == 'VEVENT']
uids = set()
for event in vevents:
uids.add(event['UID'])
assert len(uids) == 1
assert event['UID'] != icalendar.vText('event_rrule_recurrence_id')
def test_split_ics():
cal = _get_text('cal_lots_of_timezones')
vevents = aux.split_ics(cal)

vevents0 = vevents[0].split('\r\n')
vevents1 = vevents[1].split('\r\n')

part0 = _get_text('part0').split('\n')
part1 = _get_text('part1').split('\n')

assert _get_TZIDs(vevents0) == _get_TZIDs(part0)
assert _get_TZIDs(vevents1) == _get_TZIDs(part1)

assert sorted(vevents0) == sorted(part0)
assert sorted(vevents1) == sorted(part1)


def test_split_ics_random_uid():
random.seed(123)
cal = _get_text('cal_lots_of_timezones')
vevents = aux.split_ics(cal, random_uid=True)

part0 = _get_text('part0').split('\n')
part1 = _get_text('part1').split('\n')

for item in icalendar.Calendar.from_ical(vevents[0]).walk():
if item.name == 'VEVENT':
assert item['UID'] == 'DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1'
for item in icalendar.Calendar.from_ical(vevents[1]).walk():
if item.name == 'VEVENT':
assert item['UID'] == '4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB'

# after replacing the UIDs, everything should be as above
vevents0 = vevents[0].replace('DRF0RGCY89VVDKIV9VPKA1FYEAU2GCFJIBS1', '123').split('\r\n')
vevents1 = vevents[1].replace('4Q4CTV74N7UAZ618570X6CLF5QKVV9ZE3YVB', 'abcde').split('\r\n')

assert _get_TZIDs(vevents0) == _get_TZIDs(part0)
assert _get_TZIDs(vevents1) == _get_TZIDs(part1)

assert sorted(vevents0) == sorted(part0)
assert sorted(vevents1) == sorted(part1)
Loading

0 comments on commit fdf40e3

Please sign in to comment.