Source code for adpay.db.utils

from twisted.internet import defer
from txmongo import filter as txfilter

from adpay import db
from adpay.utils import utils as common_utils


[docs]class QueryIterator(object): """ Helper class for iterating over txmongo cursor queries. Every query with cursor = True can be iterated with simple way: _iter = query_iterator(query) while True: elem = yield _iter.next() if elem is None: break print "elem", elem """ def __init__(self, query): """ Constructor :param query: Query object. """ self.query = query self.docs, self.dfr = None, None self.docs_index = 0
[docs] @defer.inlineCallbacks def next(self): """ Generator for items. :return: Yields a single value from cursor. """ if self.docs is None: self.docs, self.dfr = yield self.query if not self.docs: defer.returnValue(None) if self.docs_index >= len(self.docs): self.docs, self.dfr = yield self.dfr self.docs_index = 0 value = yield self.next() defer.returnValue(value) try: value = self.docs[self.docs_index] self.docs_index += 1 defer.returnValue(value) except IndexError: raise StopIteration()
# Campaigns
[docs]@defer.inlineCallbacks def get_campaign(campaign_id): """ Fetch the campaign document from the database. :param campaign_id: Campaign id :return: Campaign document """ collection = yield db.get_campaign_collection() return_value = yield collection.find_one({'campaign_id': campaign_id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_campaign_iter(): """ Fetch all campaign documents from the database. :return: Campaign iterable as a QueryIterator. """ collection = yield db.get_campaign_collection() defer.returnValue(QueryIterator(collection.find(cursor=True)))
[docs]@defer.inlineCallbacks def update_campaign(campaign_doc): """ Update campaign data or create one if doesn't exist. :param campaign_doc: New campaign data, must include campaign_id to identify existing data. :return: deferred instance of :class:`pymongo.results.UpdateResult`. """ collection = yield db.get_campaign_collection() return_value = yield collection.replace_one({'campaign_id': campaign_doc['campaign_id']}, campaign_doc, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_campaign(campaign_id): """ Remove campaign from the database. :param campaign_id: Main identifier. :return: """ collection = yield db.get_campaign_collection() return_value = yield collection.delete_many({'campaign_id': campaign_id}) defer.returnValue(return_value)
# Banners
[docs]@defer.inlineCallbacks def get_banner(banner_id): """ Fetch the banner document from the database. :param banner_id: Banner identifier. :return: Banner document. """ collection = yield db.get_banner_collection() return_value = yield collection.find_one({'banner_id': banner_id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_banners_iter(): """ Fetch all banner documents from the database. :return: Iterable banner collection (QueryIterator) """ collection = yield db.get_banner_collection() defer.returnValue(QueryIterator(collection.find(cursor=True)))
[docs]@defer.inlineCallbacks def get_campaign_banners(campaign_id): """ Fetch the banner documents from the database for a given campaign. :param campaign_id: Campaign identifier. :return: List of banner documents. """ collection = yield db.get_banner_collection() return_value = yield collection.find({'campaign_id': campaign_id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def update_banner(banner_doc): """ Update banner data or create a new one if doesn't exist. :param banner_doc: New banner data, must include banner_id. :return: deferred instance of :class:`pymongo.results.UpdateResult`. """ collection = yield db.get_banner_collection() return_value = yield collection.replace_one({'banner_id': banner_doc['banner_id']}, banner_doc, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_campaign_banners(campaign_id): """ Remove banners (for a campaign) from the database. :param campaign_id: Campaign identifier. :return: """ collection = yield db.get_banner_collection() return_value = yield collection.delete_many({'campaign_id': campaign_id}) defer.returnValue(return_value)
# Events
[docs]@defer.inlineCallbacks def update_event(event_doc): """ Create or update an event if it doesn't exist. :param event_doc: Event document :return: """ event_doc['timestamp'] = common_utils.timestamp2hour(event_doc['timestamp']) collection = yield db.get_event_collection() return_value = yield collection.replace_one({'event_id': event_doc['event_id']}, event_doc, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_banner_events_iter(banner_id, timestamp): """ Fetch all banner documents from the database. :param banner_id: Banner identifier. :param timestamp: Time in seconds since the epoch, used for getting the full hour timestamp. :return: Iterable events collection (QueryIterator) """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_event_collection() defer.returnValue(QueryIterator(collection.find({ 'banner_id': banner_id, 'timestamp': common_utils.timestamp2hour(timestamp) }, cursor=True)))
[docs]@defer.inlineCallbacks def get_events_per_user_iter(campaign_id, timestamp, uid): """ Fetch all events for this campaign, for this timestamp, for this uid. :param campaign_id: Campaign identifier. :param timestamp: Time in seconds since the epoch, used for getting the full hour timestamp. :param uid: User identifier. :return: Iterable events for the user. """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_event_collection() defer.returnValue(QueryIterator(collection.find({ 'user_id': uid, 'campaign_id': campaign_id, 'timestamp': timestamp }, cursor=True)))
[docs]@defer.inlineCallbacks def get_distinct_users_from_events(campaign_id, timestamp): """ Fetch distinct user identifiers for this campaign, for this timestamp. :param campaign_id: Campaign identifier. :param timestamp: Time in seconds since the epoch, used for getting the full hour timestamp. :return: List of distinct users ids. """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_event_collection() return_values = yield collection.distinct(key='user_id', filter={'timestamp': timestamp, 'campaign_id': campaign_id}) defer.returnValue(return_values)
[docs]@defer.inlineCallbacks def delete_event(event_id): """ Remove event from the database. :param event_id: Identifier of the event to remove. :return: """ collection = yield db.get_event_collection() return_value = yield collection.delete_many({'event_id': event_id}) defer.returnValue(return_value)
# Event payments
[docs]@defer.inlineCallbacks def update_event_payment(campaign_id, timestamp, event_id, payment, reason): """ Create or update payment information for event. :param campaign_id: Campaign identifier. :param timestamp: Timestamp (epoch, in seconds, full hour) :param event_id: Event identifier :param payment: Payment amount. :param reason: Reason (payment classifier). :return: """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_payment_collection() return_value = yield collection.replace_one({'event_id': event_id, 'campaign_id': campaign_id}, { 'timestamp': timestamp, 'event_id': event_id, 'payment': payment, 'campaign_id': campaign_id, 'reason': reason }, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_payments_iter(timestamp): """ Fetch all payment documents from the database. :param timestamp: Timestamp (epoch, seconds, full hour) for this request. :return: Iterable payment information. """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_payment_collection() defer.returnValue(QueryIterator(collection.find({ 'timestamp': timestamp }, cursor=True)))
# Calculated payments rounds
[docs]@defer.inlineCallbacks def get_payment_round(timestamp): """ Fetch a payment round document from the database. :param timestamp: Timestamp (epoch, seconds, full hour) for this request. :return: Payment round document. """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_payment_rounds_collection() return_value = yield collection.find_one({'timestamp': timestamp}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def update_payment_round(timestamp): """ Create or update a payment round. :param timestamp: :return: """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_payment_rounds_collection() round_doc = {'timestamp': timestamp} return_value = yield collection.update(round_doc, round_doc, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_payment_round_iter(): """ :return: Iterable payment round collection. """ collection = yield db.get_payment_rounds_collection() defer.returnValue(QueryIterator(collection.find(cursor=True)))
[docs]@defer.inlineCallbacks def delete_payment_round(timestamp): """ Remove the payment round from collection. :param timestamp: :return: """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_payment_rounds_collection() return_value = yield collection.delete_many({'timestamp': timestamp}) defer.returnValue(return_value)
# User Values (Columns: campaign_id, user_id, payment, human_score)
[docs]@defer.inlineCallbacks def get_user_value_iter(campaign_id): """ :param campaign_id: :return: Iterable user value collection. """ collection = yield db.get_user_value_collection() defer.returnValue(QueryIterator(collection.find({'campaign_id': campaign_id}, cursor=True)))
[docs]@defer.inlineCallbacks def get_user_value_in_campaign(campaign_id, user_id): """ :param campaign_id: :param user_id: :return: One user value document. """ collection = yield db.get_user_value_collection() return_value = yield collection.find_one({ 'campaign_id': campaign_id, 'user_id': user_id }) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def update_user_value_in_campaign(campaign_id, user_id, payment, human_score): """ Create or update the user value document. :param campaign_id: :param user_id: :param payment: :param human_score: :return: """ collection = yield db.get_user_value_collection() return_value = yield collection.replace_one({ 'campaign_id': campaign_id, 'user_id': user_id }, { 'campaign_id': campaign_id, 'user_id': user_id, 'payment': payment, 'human_score': human_score }, upsert=True) defer.returnValue(return_value)
# User scores (Columns: campaign_id, timestamp, user_id, score)
[docs]@defer.inlineCallbacks def get_sorted_user_score_iter(campaign_id, timestamp, limit): """ :param campaign_id: :param timestamp: :param limit: :return: Descending by score sorted list of user score to limit. """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_user_score_collection() defer.returnValue(QueryIterator(collection.find({'campaign_id': campaign_id, 'timestamp': timestamp}, sort=txfilter.sort(txfilter.DESCENDING("score")), limit=limit, cursor=True)))
[docs]@defer.inlineCallbacks def update_user_score(campaign_id, timestamp, user_id, score): """ Update user score with new score and timestamp, per campaign. :param campaign_id: :param timestamp: :param user_id: :param score: :return: """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_user_score_collection() return_value = yield collection.replace_one({ 'campaign_id': campaign_id, 'timestamp': timestamp, 'user_id': user_id }, { 'campaign_id': campaign_id, 'timestamp': timestamp, 'user_id': user_id, 'score': score }, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_user_scores(campaign_id, timestamp): """ :param campaign_id: :param timestamp: :return: """ timestamp = common_utils.timestamp2hour(timestamp) collection = yield db.get_user_score_collection() return_value = yield collection.delete_many({'campaign_id': campaign_id, 'timestamp': timestamp}) defer.returnValue(return_value)
# User keywords frequency (user_id, keyword, frequency, updated=True)
[docs]@defer.inlineCallbacks def update_user_keyword_frequency(user_id, keyword, frequency, updated=True): """ Create or update user keyword frequency document. :param user_id: :param keyword: :param frequency: :param updated: :return: """ collection = yield db.get_user_keyword_frequency_collection() return_value = yield collection.replace_one({ 'keyword': keyword, 'user_id': user_id }, { 'keyword': keyword, 'user_id': user_id, 'frequency': frequency, 'updated': updated }, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def set_user_keyword_frequency_updated_flag(updated=False): """ Set/update the 'updated' flag in user keyword frequency document. :param updated: Flag value (True/False) :return: """ collection = yield db.get_user_keyword_frequency_collection() return_value = yield collection.update_many({}, {"$set": {"updated": updated}}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_user_keyword_frequency(user_id, keyword): """ :param user_id: :param keyword: :return: One use keyword frequency document. """ collection = yield db.get_user_keyword_frequency_collection() return_value = yield collection.find_one({'keyword': keyword, 'user_id': user_id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_user_keyword_frequency_iter(user_id): """ :param user_id: :return: Iterable user keyword frequency collection. """ collection = yield db.get_user_keyword_frequency_collection() defer.returnValue(QueryIterator(collection.find({'user_id': user_id}, cursor=True)))
[docs]@defer.inlineCallbacks def get_user_keyword_frequency_distinct_userids(): """ :return: Distinct user ids. """ collection = yield db.get_user_keyword_frequency_collection() return_value = yield collection.distinct(key='user_id') defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_user_keyword_frequency(_id): """ Remove user keyword documents. :param _id: Mongo document _id. :return: """ collection = yield db.get_user_keyword_frequency_collection() return_value = yield collection.delete_one({'_id': _id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def update_user_profile(user_id, profile_dict): """ Create or update user profile. :param user_id: User identifier. :param profile_dict: Dicitonary of keyword scores, e.g. {'keyword1':score1, 'keyword2':score2, ...} :return: """ collection = yield db.get_user_profile_collection() return_value = yield collection.replace_one({ 'user_id': user_id }, { 'user_id': user_id, 'profile': profile_dict }, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_user_profile(user_id): """ :param user_id: :return: One user profile document. """ collection = yield db.get_user_profile_collection() return_value = yield collection.find_one({'user_id': user_id}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_user_profiles(): """ Remove all user profile documents. :return: """ collection = yield db.get_user_profile_collection() return_value = yield collection.delete_many({}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def update_keyword_frequency(keyword, frequency, updated=True): """ Create or update keyword frequency. :param keyword: :param frequency: :param updated: :return: """ collection = yield db.get_keyword_frequency_collection() return_value = yield collection.replace_one({ 'keyword': keyword }, { 'keyword': keyword, 'frequency': frequency, 'updated': updated }, upsert=True) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def set_keyword_frequency_updated_flag(updated=False): """ Set/update the 'updated' flag in keyword frequency document. :param updated: Flag value (True/False) :return: """ collection = yield db.get_keyword_frequency_collection() return_value = yield collection.update_many({}, {"$set": {"updated": updated}}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def get_no_updated_keyword_frequency_iter(): """ :return: Iterable of not updated keyword frequencies. """ collection = yield db.get_keyword_frequency_collection() defer.returnValue(QueryIterator(collection.find({'updated': False}, cursor=True)))
[docs]@defer.inlineCallbacks def get_keyword_frequency(keyword): """ :param keyword: :return: One keyword frequency document. """ collection = yield db.get_keyword_frequency_collection() return_value = yield collection.find_one({'keyword': keyword}) defer.returnValue(return_value)
[docs]@defer.inlineCallbacks def delete_keyword_frequency(_id): """ Remove keyword frequency document. :param _id: Mongo document identifier. :return: """ collection = yield db.get_keyword_frequency_collection() return_value = yield collection.delete_many({'_id': _id}) defer.returnValue(return_value)
@defer.inlineCallbacks def delete_payments(timestamp): collection = yield db.get_payment_collection() return_value = yield collection.delete_many({'timestamp': {'$lt': timestamp}}) defer.returnValue(return_value) @defer.inlineCallbacks def delete_events(timestamp): collection = yield db.get_event_collection() return_value = yield collection.delete_many({'timestamp': {'$lt': timestamp}}) defer.returnValue(return_value)