Source code for clink.service.mongo.mongo_sv

from pymongo import MongoClient, IndexModel
from clink.com import stamp, Component

from .error import DocumentNotExist, DocumentIndexError
from .type import MongoConf


@stamp(MongoConf)
[docs]class MongoSv(Component): ''' MongoDB interface in application layer ''' def __init__(self, conf): ''' :param MongoConf conf: ''' self._conf = conf self._doc_specs = [] self._client = None self._doc_names = []
[docs] def use_docspec(self, doc_spec): ''' Put a document's specification under management :param MongoDocSpec doc_spec: ''' db = self._instance() db_docs = db.collection_names() if doc_spec.name in db_docs: self._verify_spec(doc_spec) else: self._create_spec(doc_spec) self._doc_names.append(doc_spec.name)
[docs] def use_docspecs(self, doc_specs): ''' Put document's specifications under management :param list[MongoDocSpec] doc_specs: ''' for spec in doc_specs: self.use_docspec(spec)
[docs] def doc(self, name): ''' Return document object :param str name: :rtype: pymongo.collection.Collection :raise DocumentNotExist: ''' if name not in self._doc_names: raise DocumentNotExist(name) return self._instance()[name]
[docs] def docs(self, *args): ''' Return document objects :param tuple[str] args: :rtype: tuple[pymongo.collection.Collection] :raise DocumentNotExist: ''' for name in args: if name not in self._doc_names: raise DocumentNotExist(name) db = self._instance() return tuple([db[name] for name in args])
[docs] def close(self): ''' Close connection to server ''' if self._client is not None: self._client.close()
[docs] def clear(self): ''' Clear all of data in database ''' self._connect() self._client.drop_database(self._conf.dbname)
def _create_spec(self, doc_spec): db = self._instance() doc = db[doc_spec.name] if len(doc_spec.indexes) > 0: doc.create_indexes(doc_spec.indexes) def _verify_spec(self, doc_spec): ''' Verify that document's specification is valid in database :param MongoDocSpec: :raise DocumentIndexError: ''' db = self._instance() doc_index_info = db[doc_spec.name].index_information() for index in doc_spec.indexes: # check index is exist if index.document['name'] not in doc_index_info: raise DocumentIndexError(doc_spec.name, index) # reconstruct index model from information index_name = index.document['name'] doc_info = doc_index_info[index_name] doc_info_key = doc_info['key'] doc_info['name'] = index_name del doc_info['key'] doc_index = IndexModel(doc_info_key, **doc_info) # compare two index if not self._index_is_equal(index, doc_index): raise DocumentIndexError(doc_spec.name, index) def _index_is_equal(self, index_1, index_2): ''' Compare between two indexes :param pymongo.IndexModel index_1: :param pymongo.IndexModel index_2: :rtype: bool ''' # index_1, index_2: pymongo.IndexModel doc_1 = index_1.document doc_2 = index_2.document for k in doc_1: if k not in doc_2: return False if doc_1[k] != doc_2[k]: return False return True def _connect(self): ''' Make sure that client connect to server ''' if self._client is None: self._client = MongoClient(host=self._conf.dburl) def _instance(self): ''' Return instance of database object :rtype: pymongo.database.Database ''' self._connect() return self._client[self._conf.dbname]