"""Conversation objects."""
import asyncio
import datetime
import logging
from hangups import (parsers, event, user, conversation_event, exceptions,
hangouts_pb2)
logger = logging.getLogger(__name__)
CONVERSATIONS_PER_REQUEST = 100
MAX_CONVERSATION_PAGES = 100
[docs]async def build_user_conversation_list(client):
"""Build :class:`.UserList` and :class:`.ConversationList`.
This method requests data necessary to build the list of conversations and
users. Users that are not in the contact list but are participating in a
conversation will also be retrieved.
Args:
client (Client): Connected client.
Returns:
(:class:`.UserList`, :class:`.ConversationList`):
Tuple of built objects.
"""
conv_states, sync_timestamp = await _sync_all_conversations(client)
# Retrieve entities participating in all conversations.
required_user_ids = set()
for conv_state in conv_states:
required_user_ids |= {
user.UserID(chat_id=part.id.chat_id, gaia_id=part.id.gaia_id)
for part in conv_state.conversation.participant_data
}
required_entities = []
if required_user_ids:
logger.debug('Need to request additional users: {}'
.format(required_user_ids))
try:
response = await client.get_entity_by_id(
hangouts_pb2.GetEntityByIdRequest(
request_header=client.get_request_header(),
batch_lookup_spec=[
hangouts_pb2.EntityLookupSpec(
gaia_id=user_id.gaia_id,
create_offnetwork_gaia=True,
)
for user_id in required_user_ids
],
)
)
for entity_result in response.entity_result:
required_entities.extend(entity_result.entity)
except exceptions.NetworkError as e:
logger.warning('Failed to request missing users: {}'.format(e))
# Build list of conversation participants.
conv_part_list = []
for conv_state in conv_states:
conv_part_list.extend(conv_state.conversation.participant_data)
# Retrieve self entity.
get_self_info_response = await client.get_self_info(
hangouts_pb2.GetSelfInfoRequest(
request_header=client.get_request_header(),
)
)
self_entity = get_self_info_response.self_entity
user_list = user.UserList(client, self_entity, required_entities,
conv_part_list)
conversation_list = ConversationList(client, conv_states,
user_list, sync_timestamp)
return (user_list, conversation_list)
async def _sync_all_conversations(client):
"""Sync all conversations by making paginated requests.
Conversations are ordered by ascending sort timestamp.
Args:
client (Client): Connected client.
Raises:
NetworkError: If the requests fail.
Returns:
tuple of list of ``ConversationState`` messages and sync timestamp
"""
conv_states = []
sync_timestamp = None
request = hangouts_pb2.SyncRecentConversationsRequest(
request_header=client.get_request_header(),
max_conversations=CONVERSATIONS_PER_REQUEST,
max_events_per_conversation=1,
sync_filter=[
hangouts_pb2.SYNC_FILTER_INBOX,
hangouts_pb2.SYNC_FILTER_ARCHIVED,
]
)
for _ in range(MAX_CONVERSATION_PAGES):
logger.info(
'Requesting conversations page %s', request.last_event_timestamp
)
response = await client.sync_recent_conversations(request)
conv_states = list(response.conversation_state) + conv_states
sync_timestamp = parsers.from_timestamp(
# SyncRecentConversations seems to return a sync_timestamp 4
# minutes before the present. To prevent SyncAllNewEvents later
# breaking requesting events older than what we already have, use
# current_server_time instead.
response.response_header.current_server_time
)
if response.continuation_end_timestamp == 0:
logger.info('Reached final conversations page')
break
else:
request.last_event_timestamp = response.continuation_end_timestamp
else:
logger.warning('Exceeded maximum number of conversation pages')
logger.info('Synced %s total conversations', len(conv_states))
return conv_states, sync_timestamp
[docs]class Conversation(object):
"""A single chat conversation.
Use :class:`.ConversationList` methods to get instances of this class.
"""
def __init__(self, client, user_list, conversation, events=[]):
# pylint: disable=dangerous-default-value
self._client = client # Client
self._user_list = user_list # UserList
self._conversation = conversation # hangouts_pb2.Conversation
self._events = [] # [hangouts_pb2.Event]
self._events_dict = {} # {event_id: ConversationEvent}
self._send_message_lock = asyncio.Lock()
for event_ in events:
# Workaround to ignore observed events returned from
# syncrecentconversations.
if event_.event_type != hangouts_pb2.EVENT_TYPE_OBSERVED_EVENT:
self.add_event(event_)
self.on_event = event.Event('Conversation.on_event')
"""
:class:`.Event` fired when an event occurs in this conversation.
Args:
conv_event: :class:`.ConversationEvent` that occurred.
"""
self.on_typing = event.Event('Conversation.on_typing')
"""
:class:`.Event` fired when a users starts or stops typing in this
conversation.
Args:
typing_message: :class:`~hangups.parsers.TypingStatusMessage` that
occurred.
"""
self.on_watermark_notification = event.Event(
'Conversation.on_watermark_notification'
)
"""
:class:`.Event` fired when a watermark (read timestamp) is updated for
this conversation.
Args:
watermark_notification:
:class:`~hangups.parsers.WatermarkNotification` that occurred.
"""
self.on_watermark_notification.add_observer(
self._on_watermark_notification
)
@property
def id_(self):
"""The conversation's ID (:class:`str`)."""
return self._conversation.conversation_id.id
@property
def users(self):
"""List of conversation participants (:class:`~hangups.user.User`)."""
return [self._user_list.get_user(user.UserID(chat_id=part.id.chat_id,
gaia_id=part.id.gaia_id))
for part in self._conversation.participant_data]
@property
def name(self):
"""The conversation's custom name (:class:`str`)
May be ``None`` if conversation has no custom name.
"""
custom_name = self._conversation.name
return None if custom_name == '' else custom_name
@property
def last_modified(self):
"""When conversation was last modified (:class:`datetime.datetime`)."""
timestamp = self._conversation.self_conversation_state.sort_timestamp
# timestamp can be None for some reason when there is an ongoing video
# hangout
if timestamp is None:
timestamp = 0
return parsers.from_timestamp(timestamp)
@property
def latest_read_timestamp(self):
"""Timestamp of latest read event (:class:`datetime.datetime`)."""
timestamp = (self._conversation.self_conversation_state.
self_read_state.latest_read_timestamp)
return parsers.from_timestamp(timestamp)
@property
def events(self):
"""Loaded events sorted oldest to newest.
(list of :class:`.ConversationEvent`).
"""
return list(self._events)
@property
def unread_events(self):
"""Loaded events which are unread sorted oldest to newest.
Some Hangouts clients don't update the read timestamp for certain event
types, such as membership changes, so this may return more unread
events than these clients will show. There's also a delay between
sending a message and the user's own message being considered read.
(list of :class:`.ConversationEvent`).
"""
return [conv_event for conv_event in self._events
if conv_event.timestamp > self.latest_read_timestamp]
@property
def is_archived(self):
"""``True`` if this conversation has been archived."""
return (hangouts_pb2.CONVERSATION_VIEW_ARCHIVED in
self._conversation.self_conversation_state.view)
@property
def is_quiet(self):
"""``True`` if notification level for this conversation is quiet."""
level = self._conversation.self_conversation_state.notification_level
return level == hangouts_pb2.NOTIFICATION_LEVEL_QUIET
@property
def is_off_the_record(self):
"""``True`` if conversation is off the record (history is disabled)."""
status = self._conversation.otr_status
return status == hangouts_pb2.OFF_THE_RECORD_STATUS_OFF_THE_RECORD
def _on_watermark_notification(self, notif):
"""Update the conversations latest_read_timestamp."""
if self.get_user(notif.user_id).is_self:
logger.info('latest_read_timestamp for {} updated to {}'
.format(self.id_, notif.read_timestamp))
self_conversation_state = (
self._conversation.self_conversation_state
)
self_conversation_state.self_read_state.latest_read_timestamp = (
parsers.to_timestamp(notif.read_timestamp)
)
[docs] def update_conversation(self, conversation):
"""Update the internal state of the conversation.
This method is used by :class:`.ConversationList` to maintain this
instance.
Args:
conversation: ``Conversation`` message.
"""
# StateUpdate.conversation is actually a delta; fields that aren't
# specified are assumed to be unchanged. Until this class is
# refactored, hide this by saving and restoring previous values where
# necessary.
new_state = conversation.self_conversation_state
old_state = self._conversation.self_conversation_state
self._conversation = conversation
# delivery_medium_option
if not new_state.delivery_medium_option:
new_state.delivery_medium_option.extend(
old_state.delivery_medium_option
)
# latest_read_timestamp
old_timestamp = old_state.self_read_state.latest_read_timestamp
new_timestamp = new_state.self_read_state.latest_read_timestamp
if new_timestamp == 0:
new_state.self_read_state.latest_read_timestamp = old_timestamp
@staticmethod
def _wrap_event(event_):
"""Wrap hangouts_pb2.Event in ConversationEvent subclass."""
cls = conversation_event.ConversationEvent
if event_.HasField('chat_message'):
cls = conversation_event.ChatMessageEvent
elif event_.HasField('otr_modification'):
cls = conversation_event.OTREvent
elif event_.HasField('conversation_rename'):
cls = conversation_event.RenameEvent
elif event_.HasField('membership_change'):
cls = conversation_event.MembershipChangeEvent
elif event_.HasField('hangout_event'):
cls = conversation_event.HangoutEvent
elif event_.HasField('group_link_sharing_modification'):
cls = conversation_event.GroupLinkSharingModificationEvent
return cls(event_)
[docs] def add_event(self, event_):
"""Add an event to the conversation.
This method is used by :class:`.ConversationList` to maintain this
instance.
Args:
event_: ``Event`` message.
Returns:
:class:`.ConversationEvent` representing the event.
"""
conv_event = self._wrap_event(event_)
if conv_event.id_ not in self._events_dict:
self._events.append(conv_event)
self._events_dict[conv_event.id_] = conv_event
else:
# If this happens, there's probably a bug.
logger.info('Conversation %s ignoring duplicate event %s',
self.id_, conv_event.id_)
return None
return conv_event
[docs] def get_user(self, user_id):
"""Get user by its ID.
Args:
user_id (~hangups.user.UserID): ID of user to return.
Raises:
KeyError: If the user ID is not found.
Returns:
:class:`~hangups.user.User` with matching ID.
"""
return self._user_list.get_user(user_id)
def _get_default_delivery_medium(self):
"""Return default DeliveryMedium to use for sending messages.
Use the first option, or an option that's marked as the current
default.
"""
medium_options = (
self._conversation.self_conversation_state.delivery_medium_option
)
try:
default_medium = medium_options[0].delivery_medium
except IndexError:
logger.warning('Conversation %r has no delivery medium', self.id_)
default_medium = hangouts_pb2.DeliveryMedium(
medium_type=hangouts_pb2.DELIVERY_MEDIUM_BABEL
)
for medium_option in medium_options:
if medium_option.current_default:
default_medium = medium_option.delivery_medium
return default_medium
def _get_event_request_header(self):
"""Return EventRequestHeader for conversation."""
otr_status = (hangouts_pb2.OFF_THE_RECORD_STATUS_OFF_THE_RECORD
if self.is_off_the_record else
hangouts_pb2.OFF_THE_RECORD_STATUS_ON_THE_RECORD)
return hangouts_pb2.EventRequestHeader(
conversation_id=hangouts_pb2.ConversationId(id=self.id_),
client_generated_id=self._client.get_client_generated_id(),
expected_otr=otr_status,
delivery_medium=self._get_default_delivery_medium(),
)
[docs] async def send_message(self, segments, image_file=None, image_id=None,
image_user_id=None):
"""Send a message to this conversation.
A per-conversation lock is acquired to ensure that messages are sent in
the correct order when this method is called multiple times
asynchronously.
Args:
segments: List of :class:`.ChatMessageSegment` objects to include
in the message.
image_file: (optional) File-like object containing an image to be
attached to the message.
image_id: (optional) ID of an Picasa photo to be attached to the
message. If you specify both ``image_file`` and ``image_id``
together, ``image_file`` takes precedence and ``image_id`` will
be ignored.
image_user_id: (optional) Picasa user ID, required only if
``image_id`` refers to an image from a different Picasa user,
such as Google's sticker user.
Raises:
.NetworkError: If the message cannot be sent.
"""
async with self._send_message_lock:
if image_file:
try:
uploaded_image = await self._client.upload_image(
image_file, return_uploaded_image=True
)
except exceptions.NetworkError as e:
logger.warning('Failed to upload image: {}'.format(e))
raise
image_id = uploaded_image.image_id
try:
request = hangouts_pb2.SendChatMessageRequest(
request_header=self._client.get_request_header(),
event_request_header=self._get_event_request_header(),
message_content=hangouts_pb2.MessageContent(
segment=[seg.serialize() for seg in segments],
),
)
if image_id is not None:
request.existing_media.photo.photo_id = image_id
if image_user_id is not None:
request.existing_media.photo.user_id = image_user_id
request.existing_media.photo.is_custom_user_id = True
await self._client.send_chat_message(request)
except exceptions.NetworkError as e:
logger.warning('Failed to send message: {}'.format(e))
raise
[docs] async def leave(self):
"""Leave this conversation.
Raises:
.NetworkError: If conversation cannot be left.
"""
is_group_conversation = (self._conversation.type ==
hangouts_pb2.CONVERSATION_TYPE_GROUP)
try:
if is_group_conversation:
await self._client.remove_user(
hangouts_pb2.RemoveUserRequest(
request_header=self._client.get_request_header(),
event_request_header=self._get_event_request_header(),
)
)
else:
await self._client.delete_conversation(
hangouts_pb2.DeleteConversationRequest(
request_header=self._client.get_request_header(),
conversation_id=hangouts_pb2.ConversationId(
id=self.id_
),
delete_upper_bound_timestamp=parsers.to_timestamp(
datetime.datetime.now(tz=datetime.timezone.utc)
)
)
)
except exceptions.NetworkError as e:
logger.warning('Failed to leave conversation: {}'.format(e))
raise
[docs] async def rename(self, name):
"""Rename this conversation.
Hangouts only officially supports renaming group conversations, so
custom names for one-to-one conversations may or may not appear in all
first party clients.
Args:
name (str): New name.
Raises:
.NetworkError: If conversation cannot be renamed.
"""
await self._client.rename_conversation(
hangouts_pb2.RenameConversationRequest(
request_header=self._client.get_request_header(),
new_name=name,
event_request_header=self._get_event_request_header(),
)
)
[docs] async def set_notification_level(self, level):
"""Set the notification level of this conversation.
Args:
level: ``NOTIFICATION_LEVEL_QUIET`` to disable notifications, or
``NOTIFICATION_LEVEL_RING`` to enable them.
Raises:
.NetworkError: If the request fails.
"""
await self._client.set_conversation_notification_level(
hangouts_pb2.SetConversationNotificationLevelRequest(
request_header=self._client.get_request_header(),
conversation_id=hangouts_pb2.ConversationId(id=self.id_),
level=level,
)
)
[docs] async def set_typing(self, typing=hangouts_pb2.TYPING_TYPE_STARTED):
"""Set your typing status in this conversation.
Args:
typing: (optional) ``TYPING_TYPE_STARTED``, ``TYPING_TYPE_PAUSED``,
or ``TYPING_TYPE_STOPPED`` to start, pause, or stop typing,
respectively. Defaults to ``TYPING_TYPE_STARTED``.
Raises:
.NetworkError: If typing status cannot be set.
"""
# TODO: Add rate-limiting to avoid unnecessary requests.
try:
await self._client.set_typing(
hangouts_pb2.SetTypingRequest(
request_header=self._client.get_request_header(),
conversation_id=hangouts_pb2.ConversationId(id=self.id_),
type=typing,
)
)
except exceptions.NetworkError as e:
logger.warning('Failed to set typing status: {}'.format(e))
raise
[docs] async def update_read_timestamp(self, read_timestamp=None):
"""Update the timestamp of the latest event which has been read.
This method will avoid making an API request if it will have no effect.
Args:
read_timestamp (datetime.datetime): (optional) Timestamp to set.
Defaults to the timestamp of the newest event.
Raises:
.NetworkError: If the timestamp cannot be updated.
"""
if read_timestamp is None:
read_timestamp = (self.events[-1].timestamp if self.events else
datetime.datetime.now(datetime.timezone.utc))
if read_timestamp > self.latest_read_timestamp:
logger.info(
'Setting {} latest_read_timestamp from {} to {}'
.format(self.id_, self.latest_read_timestamp, read_timestamp)
)
# Prevent duplicate requests by updating the conversation now.
state = self._conversation.self_conversation_state
state.self_read_state.latest_read_timestamp = (
parsers.to_timestamp(read_timestamp)
)
try:
await self._client.update_watermark(
hangouts_pb2.UpdateWatermarkRequest(
request_header=self._client.get_request_header(),
conversation_id=hangouts_pb2.ConversationId(
id=self.id_
),
last_read_timestamp=parsers.to_timestamp(
read_timestamp
),
)
)
except exceptions.NetworkError as e:
logger.warning('Failed to update read timestamp: {}'.format(e))
raise
[docs] async def get_events(self, event_id=None, max_events=50):
"""Get events from this conversation.
Makes a request to load historical events if necessary.
Args:
event_id (str): (optional) If provided, return events preceding
this event, otherwise return the newest events.
max_events (int): Maximum number of events to return. Defaults to
50.
Returns:
List of :class:`.ConversationEvent` instances, ordered
newest-first.
Raises:
KeyError: If ``event_id`` does not correspond to a known event.
.NetworkError: If the events could not be requested.
"""
if event_id is None:
# If no event_id is provided, return the newest events in this
# conversation.
conv_events = self._events[-1 * max_events:]
else:
# If event_id is provided, return the events we have that are
# older, or request older events if event_id corresponds to the
# oldest event we have.
conv_event = self.get_event(event_id)
if self._events[0].id_ != event_id:
conv_events = self._events[self._events.index(conv_event) + 1:]
else:
logger.info('Loading events for conversation {} before {}'
.format(self.id_, conv_event.timestamp))
res = await self._client.get_conversation(
hangouts_pb2.GetConversationRequest(
request_header=self._client.get_request_header(),
conversation_spec=hangouts_pb2.ConversationSpec(
conversation_id=hangouts_pb2.ConversationId(
id=self.id_
)
),
include_event=True,
max_events_per_conversation=max_events,
event_continuation_token=(
hangouts_pb2.EventContinuationToken(
event_timestamp=parsers.to_timestamp(
conv_event.timestamp
)
)
)
)
)
conv_events = [self._wrap_event(event) for event
in res.conversation_state.event]
logger.info('Loaded {} events for conversation {}'
.format(len(conv_events), self.id_))
# Iterate though the events newest to oldest.
for conv_event in reversed(conv_events):
# Add event as the new oldest event, unless we already have
# it.
if conv_event.id_ not in self._events_dict:
self._events.insert(0, conv_event)
self._events_dict[conv_event.id_] = conv_event
else:
# If this happens, there's probably a bug.
logger.info(
'Conversation %s ignoring duplicate event %s',
self.id_, conv_event.id_
)
return conv_events
[docs] def next_event(self, event_id, prev=False):
"""Get the event following another event in this conversation.
Args:
event_id (str): ID of the event.
prev (bool): If ``True``, return the previous event rather than the
next event. Defaults to ``False``.
Raises:
KeyError: If no such :class:`.ConversationEvent` is known.
Returns:
:class:`.ConversationEvent` or ``None`` if there is no following
event.
"""
i = self.events.index(self._events_dict[event_id])
if prev and i > 0:
return self.events[i - 1]
elif not prev and i + 1 < len(self.events):
return self.events[i + 1]
else:
return None
[docs] def get_event(self, event_id):
"""Get an event in this conversation by its ID.
Args:
event_id (str): ID of the event.
Raises:
KeyError: If no such :class:`.ConversationEvent` is known.
Returns:
:class:`.ConversationEvent` with the given ID.
"""
return self._events_dict[event_id]
[docs]class ConversationList(object):
"""Maintains a list of the user's conversations.
Using :func:`build_user_conversation_list` to initialize this class is
recommended.
Args:
client: The connected :class:`Client`.
conv_states: List of ``ConversationState`` messages used to initialize
the list of conversations.
user_list: :class:`.UserList` object.
sync_timestamp (datetime.datetime): The time when ``conv_states`` was
synced.
"""
def __init__(self, client, conv_states, user_list, sync_timestamp):
self._client = client # Client
self._conv_dict = {} # {conv_id: Conversation}
self._sync_timestamp = sync_timestamp # datetime
self._user_list = user_list # UserList
# Initialize the list of conversations from Client's list of
# hangouts_pb2.ConversationState.
for conv_state in conv_states:
self._add_conversation(conv_state.conversation, conv_state.event)
self._client.on_state_update.add_observer(self._on_state_update)
self._client.on_connect.add_observer(self._sync)
self._client.on_reconnect.add_observer(self._sync)
self.on_event = event.Event('ConversationList.on_event')
"""
:class:`.Event` fired when an event occurs in any conversation.
Args:
conv_event: :class:`ConversationEvent` that occurred.
"""
self.on_typing = event.Event('ConversationList.on_typing')
"""
:class:`.Event` fired when a users starts or stops typing in any
conversation.
Args:
typing_message: :class:`~hangups.parsers.TypingStatusMessage` that
occurred.
"""
self.on_watermark_notification = event.Event(
'ConversationList.on_watermark_notification'
)
"""
:class:`.Event` fired when a watermark (read timestamp) is updated for
any conversation.
Args:
watermark_notification:
:class:`~hangups.parsers.WatermarkNotification` that occurred.
"""
[docs] def get_all(self, include_archived=False):
"""Get all the conversations.
Args:
include_archived (bool): (optional) Whether to include archived
conversations. Defaults to ``False``.
Returns:
List of all :class:`.Conversation` objects.
"""
return [conv for conv in self._conv_dict.values()
if not conv.is_archived or include_archived]
[docs] def get(self, conv_id):
"""Get a conversation by its ID.
Args:
conv_id (str): ID of conversation to return.
Raises:
KeyError: If the conversation ID is not found.
Returns:
:class:`.Conversation` with matching ID.
"""
return self._conv_dict[conv_id]
[docs] async def leave_conversation(self, conv_id):
"""Leave a conversation.
Args:
conv_id (str): ID of conversation to leave.
"""
logger.info('Leaving conversation: {}'.format(conv_id))
await self._conv_dict[conv_id].leave()
del self._conv_dict[conv_id]
def _add_conversation(self, conversation, events=[]):
"""Add new conversation from hangouts_pb2.Conversation"""
# pylint: disable=dangerous-default-value
conv_id = conversation.conversation_id.id
logger.debug('Adding new conversation: {}'.format(conv_id))
conv = Conversation(self._client, self._user_list, conversation,
events)
self._conv_dict[conv_id] = conv
return conv
async def _on_state_update(self, state_update):
"""Receive a StateUpdate and fan out to Conversations.
Args:
state_update: hangouts_pb2.StateUpdate instance
"""
# The state update will include some type of notification:
notification_type = state_update.WhichOneof('state_update')
# If conversation fields have been updated, the state update will have
# a conversation containing changed fields. Handle updating the
# conversation from this delta:
if state_update.HasField('conversation'):
try:
await self._handle_conversation_delta(
state_update.conversation
)
except exceptions.NetworkError:
logger.warning(
'Discarding %s for %s: Failed to fetch conversation',
notification_type.replace('_', ' '),
state_update.conversation.conversation_id.id
)
return
if notification_type == 'typing_notification':
await self._handle_set_typing_notification(
state_update.typing_notification
)
elif notification_type == 'watermark_notification':
await self._handle_watermark_notification(
state_update.watermark_notification
)
elif notification_type == 'event_notification':
await self._on_event(
state_update.event_notification.event
)
async def _get_or_fetch_conversation(self, conv_id):
"""Get a cached conversation or fetch a missing conversation.
Args:
conv_id: string, conversation identifier
Raises:
NetworkError: If the request to fetch the conversation fails.
Returns:
:class:`.Conversation` with matching ID.
"""
conv = self._conv_dict.get(conv_id, None)
if conv is None:
logger.info('Fetching unknown conversation %s', conv_id)
res = await self._client.get_conversation(
hangouts_pb2.GetConversationRequest(
request_header=self._client.get_request_header(),
conversation_spec=hangouts_pb2.ConversationSpec(
conversation_id=hangouts_pb2.ConversationId(
id=conv_id
)
), include_event=False
)
)
return self._add_conversation(res.conversation_state.conversation)
else:
return conv
async def _on_event(self, event_):
"""Receive a hangouts_pb2.Event and fan out to Conversations.
Args:
event_: hangouts_pb2.Event instance
"""
conv_id = event_.conversation_id.id
try:
conv = await self._get_or_fetch_conversation(conv_id)
except exceptions.NetworkError:
logger.warning(
'Failed to fetch conversation for event notification: %s',
conv_id
)
else:
self._sync_timestamp = parsers.from_timestamp(event_.timestamp)
conv_event = conv.add_event(event_)
# conv_event may be None if the event was a duplicate.
if conv_event is not None:
await self.on_event.fire(conv_event)
await conv.on_event.fire(conv_event)
async def _handle_conversation_delta(self, conversation):
"""Receive Conversation delta and create or update the conversation.
Args:
conversation: hangouts_pb2.Conversation instance
Raises:
NetworkError: A request to fetch the complete conversation failed.
"""
conv_id = conversation.conversation_id.id
conv = self._conv_dict.get(conv_id, None)
if conv is None:
# Ignore the delta and fetch the complete conversation.
await self._get_or_fetch_conversation(conv_id)
else:
# Update conversation using the delta.
conv.update_conversation(conversation)
async def _handle_set_typing_notification(self, set_typing_notification):
"""Receive SetTypingNotification and update the conversation.
Args:
set_typing_notification: hangouts_pb2.SetTypingNotification
instance
"""
conv_id = set_typing_notification.conversation_id.id
res = parsers.parse_typing_status_message(set_typing_notification)
await self.on_typing.fire(res)
try:
conv = await self._get_or_fetch_conversation(conv_id)
except exceptions.NetworkError:
logger.warning(
'Failed to fetch conversation for typing notification: %s',
conv_id
)
else:
await conv.on_typing.fire(res)
async def _handle_watermark_notification(self, watermark_notification):
"""Receive WatermarkNotification and update the conversation.
Args:
watermark_notification: hangouts_pb2.WatermarkNotification instance
"""
conv_id = watermark_notification.conversation_id.id
res = parsers.parse_watermark_notification(watermark_notification)
await self.on_watermark_notification.fire(res)
try:
conv = await self._get_or_fetch_conversation(conv_id)
except exceptions.NetworkError:
logger.warning(
'Failed to fetch conversation for watermark notification: %s',
conv_id
)
else:
await conv.on_watermark_notification.fire(res)
async def _sync(self):
"""Sync conversation state and events that could have been missed."""
logger.info('Syncing events since {}'.format(self._sync_timestamp))
try:
res = await self._client.sync_all_new_events(
hangouts_pb2.SyncAllNewEventsRequest(
request_header=self._client.get_request_header(),
last_sync_timestamp=parsers.to_timestamp(
self._sync_timestamp
),
max_response_size_bytes=1048576, # 1 MB
)
)
except exceptions.NetworkError as e:
logger.warning('Failed to sync events, some events may be lost: {}'
.format(e))
else:
for conv_state in res.conversation_state:
conv_id = conv_state.conversation_id.id
conv = self._conv_dict.get(conv_id, None)
if conv is not None:
conv.update_conversation(conv_state.conversation)
for event_ in conv_state.event:
timestamp = parsers.from_timestamp(event_.timestamp)
if timestamp > self._sync_timestamp:
# This updates the sync_timestamp for us, as well
# as triggering events.
await self._on_event(event_)
else:
self._add_conversation(conv_state.conversation,
conv_state.event)