#!/usr/bin/env python #-*- coding: ISO-8859-1 -*- # # Author: Valentin Kuznetsov, Cornell University, 2006 # # system modules import os, string, sys, types, cgi # SQLAlchemu modules #from sqlalchemy import * #import sqlalchemy.mods.threadlocal # DBS modules from DBSTables import * from dbsApi import DbsApi, DbsApiException, InvalidDataTier from dbsPrimaryDataset import * from dbsFileBlock import * from dbsApplication import * from dbsProcessing import * from dbsProcessedDataset import * from dbsBaseObject import * from dbsParameterSet import * from dbsApplicationConfig import * def printExcept(): """print exception type, value and traceback on stderr""" sys.excepthook(sys.exc_info()[0],sys.exc_info()[1],sys.exc_info()[2]) class DbsPatternError(DbsApiException): def __init__ (self, **kwargs): DbsApiException.__init__(self, **kwargs) class DbsDatabaseError(DbsApiException): def __init__ (self, **kwargs): printExcept() DbsApiException.__init__(self, **kwargs) def printList(self,msg,iList): """ Print elements of the provided list to stdout """ print msg for i in iList: print i print def demanglePattern(pattern): """ Demangle given pattern into elements. Return a list of elements, e.g. /path1/path2/path3 will produce a list ['path1','path2','path3'] """ if pattern=="*" or not pattern: return ['','',''] components = string.split(os.path.normpath(pattern),'/') if pattern[0]!='/': msg = "path pattern '%s' doesn't start with /"%pattern raise DbsPatternError(args=msg) # replace '*' with '' in pattern, since later we'll skip such where clause for idx in xrange(0,len(components)): if components[idx]=="*": components[idx]='' return components[1:] def makeNamed(iTable,iName): """ Insert iName into iTable, where iTable=(id,name) """ # first look if name already exists in a table id = getNamed(iTable,iName) if id: return id res = "" try: res = iTable.insert().execute(name=iName) except Exception, ex: raise DbsDatabaseError(args=ex) return res.last_inserted_ids()[0] def getNamed(iTable,iName): """ Get id from iTable for given iName. """ # print "SELECT FROM %s WHERE %s"%(iTable,iName) res = select([iTable.c.id],and_(iTable.c.name==iName)).execute() tup = res.fetchone() if tup and tup[0]: return tup[0] return "" class DBSManager(DbsApi): def __init__(self,verbose=0): self.verbose = verbose if verbose: DBS_DB.echo=True else: DBS_DB.echo=False def printFunc(self,name,arg): """ Print to stdout name of the class method with its parameters. """ if self.verbose: print "### %s.%s\n(" % (self.__class__.__name__, name) if arg: print " ",arg print ")" print def listPrimaryDatasets(self, pattern="*"): """ Retrieve list of primary datasets matching a shell glob pattern. Returns a list of DbsPrimaryDataset objects. If the pattern is given, it will be matched against the dataset name as a shell glob pattern. May raise an DbsApiException. """ self.printFunc("listPrimaryDatasets",pattern) res = "" try: if pattern=="*": # we need to select everything res = t_primary_dataset.select().execute() else: res = t_primary_dataset.select(t_primary_dataset.c.name.like('%%%s%%'%pattern)).execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] for row in res.fetchall(): result.append( DbsPrimaryDataset( objectId=long(row[0]), datasetName=str(row[1]) ) ) return result def listProcessedDatasets(self, pattern="*"): """ Retrieve list of processed datasets matching a shell glob pattern. Returns a list of DbsProcessedDataset objects. If the pattern is given, it will be matched against the dataset path as a shell glob pattern. SQL query: select procds.id, primds.name, procname.name, dt.name from t_processed_dataset procds join t_primary_dataset primds on primds.id = procds.primary_dataset join t_processing_name procname on procname.id = procds.name join t_data_tier dt on dt.id = procds.data_tier May raise an DbsApiException. """ self.printFunc("listProcessedDatasets",pattern) tprd = t_processed_dataset.alias('tprd') tpmd = t_primary_dataset.alias('tpmd') tprn = t_processing_name.alias('tprn') tdt = t_data_tier.alias('tdt') sel = select([tprd.c.id,tpmd.c.name,tprn.c.name,tdt.c.name]) try: if pattern!="*": tpmd_name, tdt_name, tprd_name = demanglePattern(pattern) if tpmd_name: sel.append_whereclause(tpmd.c.name==tpmd_name) if tdt_name: sel.append_whereclause(tdt.c.name==tdt_name) if tprd_name: sel.append_whereclause(tprd.c.name==tprd_name) sel.distinct=True res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] for row in res.fetchall(): id = long(row[0]) name = str( '/' + row[1]+'/' + row[2] + '/' + row[3] ) result.append( DbsProcessedDataset (objectId=id, datasetPathName=name ) ) return result def listParameterSets(self, pattern="*"): """ Retrieve list of parameter sets matching a shell glob pattern. Returns a list of DbsParameterSet objects. If the pattern is given, it will be matched against the content as a shell glob pattern. SQL query: select id, hash, content from t_parameter_set May raise an DbsApiException. """ self.printFunc("listParameterSets",pattern) res = "" try: if pattern=="*": # we need to select everything res = t_parameter_set.select().execute() else: res = t_parameter_set.select(t_parameter_set.c.content.like('%%%s%%'%pattern)).execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] for row in res.fetchall(): result.append(DbsParameterSet(objectId=long(row[0]),hash=str(row[1]),content=str(row[2]))) return result def listApplications(self, pattern="*"): """ Retrieve list of applications matching a shell glob pattern. Returns a list of DbsApplication objects. If the pattern is given, it will be matched against the application label as /family/executable/version as a shell glob pattern. SQL query: select a.id, af.name, a.executable, a.app_version from t_application a join t_app_family af on af.id = a.app_family May raise an DbsApiException. """ self.printFunc("listApplications",pattern) tapp = t_application.alias('tapp') tapf = t_app_family.alias('tapf') res = "" sel = select([tapp.c.id,tapf.c.name,tapp.c.executable,tapp.c.app_version]) try: if pattern!="*": family, exe, ver = demanglePattern(pattern) if familty: sel.append_whereclause(tapf.c.name==family) if tdt_name: sel.append_whereclause(tapp.c.executable==exe) if tprd_name: sel.append_whereclause(tapp.c.app_version==ver) sel.distinct=True res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] app_result = {} for row in res.fetchall(): app_result['objectId'] = long(row[0]) app_result['family'] = str(row[1]) app_result['executable'] = str(row[2]) app_result['testversion'] = str(row[3]) result.append( DbsApplication(application=app_result) ) return result def listApplicationConfigs(self, pattern="*"): """ Retrieve list of application configurations matching a shell glob pattern. Returns a list of DbsApplicationConfig objects. If the pattern is given, it will be matched against the application label as /family/executable/version as a shell glob pattern; all the parameter sets used with that application will be returned. May raise an DbsApiException. SQL query: select ac.id, a.id, af.name, a.executable, a.app_version, p.id, p.hash, p.content from t_app_config ac join t_application a on a.id = ac.application join t_app_family af on af.id = a.app_family join t_parameter_set p on p.id = ac.parameter_set """ self.printFunc("listApplicationConfigs",pattern) tapp = t_application.alias('tapp') tapf = t_app_family.alias('tapf') tapc = t_app_config.alias('tapc') tps = t_parameter_set.alias('tps') res = "" sel = select([tapc.c.id,tapp.c.id,tapf.c.name,tapp.c.executable,tapp.c.app_version, tps.c.id,tps.c.hash,tps.c.content]) try: if pattern!="*": family, exe, ver = demanglePattern(pattern) if familty: sel.append_whereclause(tapf.c.name==family) if tdt_name: sel.append_whereclause(tapp.c.executable==exe) if tprd_name: sel.append_whereclause(tapp.c.app_version==ver) sel.distinct=True res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] app_result = {} ps_result = {} for row in res.fetchall(): app_result['objectId'] = long(row[1]) app_result['family'] = str(row[2]) app_result['executable'] = str(row[3]) app_result['testversion'] = str(row[4]) ps_result['objectId'] = long(row[5]) ps_result['hash'] = str(row[6]) ps_result['content'] = str(row[7]) appConfig= {'objectId': long(row[0]), 'application': app_result, 'parameterSet' : ps_result } result.append(DbsApplicationConfig(applicationConfig=appConfig)) return result def getDatasetProvenance(self, dataset, tiers=[]): """ Retrieve the dataset parents of the dataset. If tiers is an empty list, retrieves all parents. Otherwise returns only the data tiers that match. The result is a list of DbsParents with parentage type set and referring to DbsProcessedDatasets with path name and data tier filled in. The input dataset should be an DbsProcessedDataset with path set, or primary dataset, tier and processed dataset name filled in. For backwards compatibility a simple dataset path name string is also accepted. Raises InvalidDataTier if tiers includes an unknown data tier, InvalidDatasetPathName if the dataset path is invalid, otherwise may raise an DbsApiException. SQL query: select distinct pt.name, procds.id, procds.data_tier, primds.name, procname.name from t_event_collection ec join t_evcoll_parentage ep on ep.child = ec.id join t_event_collection ec2 on ec2.id = ep.parent join t_processed_dataset procds on procds.id = ec2.processed_dataset join t_processing_name procname on procname.id = procds.name join t_primary_dataset primds on primds.id = procds.primary_dataset join t_parentage_type pt on pt.id = ep.type where ec.processed_dataset = """ self.printFunc("getDatasetProvenance",(dataset,tiers)) return def datasetFromPath(self,datasetPath): """ Return a unque id of processed dataset upon request of dataset path. May raise an DbsApiException. SQL query: select procds.id from t_processed_dataset procds join t_primary_dataset primds on primds.id = procds.primary_dataset join t_processing_name proname on proname.id = procds.name join t_data_tier dt on dt.id = procds.data_tier where proname.name = :procname and primds.name = :primname and dt.name = :tiername """ self.printFunc("datasetFromPath",datasetPath) procname, primname, tiername = demanglePattern(datasetPath) tprd = t_processed_dataset.alias('tprd') tpm = t_primary_dataset.alias('tpd') tpn = t_processing_name.alias('tpn') tdt = t_data_tier.alias('tdt') res = "" sel = select([tprd.c.id],distinct=True) if procname: sel.append_whereclause(tpn.c.name==procname) if primname: sel.append_whereclause(tpm.c.name==primname) if tiername: sel.append_whereclause(tdt.c.name==tiername) try: res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) return res.fetchone() def getDatasetContents(self, dataset): """ Retrieve the event collections of dataset by file block. Returns a list of DbsFileBlock objects, with event collection list filled with DbsEventCollection objects. The input dataset should be an DbsProcessedDataset with path set, or a DbsProcessedDataset with primary dataset, tier and processed dataset name filled in. For backwards compatibility a simple dataset path name string is also accepted. Raises InvalidDatasetPathName if the path is invalid, otherwise may raise an DbsCgiApiException. See getDatasetFiles() for a version that returns files. SQL query: select distinct evc.id, evc.name, evc.events, evs.name, b.id, bs.name from t_event_collection evc join t_evcoll_file evf on evf.evcoll = evc.id left join t_evcoll_status evs on evs.id = evc.status join t_file f on f.id = evf.fileid join t_block b on b.id = f.inblock join t_block_status bs on bs.id = b.status where evc.processed_dataset = :id order by b.id, evc.name """ self.printFunc("getDatasetContents",dataset) datasetPath = dataset['datasetPath'] tevc = t_event_collection.alias('tevc') tevf = t_evcoll_file.alias('tevf') tevs = t_evcoll_status.alias('tevs') tf = t_file.alias('tf') tb = t_block.alias('tb') tbs = t_block_status.alias('tbs') res = "" sel = select([tevc.c.id,tevc.c.name,tevc.c.events,tevs.c.name,tb.c.id,tbs.c.name], tevc.c.processed_dataset==self.datasetFromPath(datasetPath), order_by=[tb.c.id, tevc.c.name], distinct=True) try: res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] app_result = {} ps_result = {} for row in res.fetchall(): print row return result def getDatasetFileBlocks(self, dataset): """ Retrieve the files related to the dataset, organised by file block. Returns a list of DbsFileBlock objects, each with the file list filled with DbsFile objects. Note that the files may contain data for other datasets. The input dataset should be an DbsProcessedDataset with path set, or a DbsProcessedDataset with primary dataset, tier and processed dataset name filled in. For backwards compatibility a simple dataset path name string is also accepted. Raises InvalidDatasetPathName if the path is invalid, otherwise may raise an DbsApiException. See getDatasetContents() for a version that returns event collections. SQL query: select f.id,f.logical_name,f.guid,f.filesize,f.checksum,fs.name,ft.name, b.id,b.files,b.bytes,bs.name from t_processed_dataset pd join t_processing p on p.primary_dataset = pd.primary_dataset and p.name = pd.name join t_block b on b.processing = p.id join t_block_status bs on bs.id = b.status left join t_file f on f.inblock = b.id left join t_file_status fs on fs.id = f.status left join t_file_type ft on ft.id = f.type where pd.id = :id order by b.id, f.logical_name """ self.printFunc("getDatasetFileBlocks",dataset) datasetPath = dataset['datasetPath'] # Translate the dataset path to an id id = datasetFromPath(datasetPath) tpd = t_processed_dataset.alias('tpd') tp = t_processing.alias('tp') tb = t_block.alias('tb') tf = t_file.alias('tf') tfs = t_file_status.alias('tfs') tft = t_file_type.alias('tft') res = "" sel = select([tf.c.id,tf.c.logical_name,tf.c.guid,tf.c.filesize,tf.c.checksum, tfs.c.name,tft.c.name,tb.c.id,tb.c.files,tb.c.bytes,tbs.c.name], tp.c.primary_dataset==tpd.c.primary_dataset,tp.c.name==tpd.c.name, tb.c.processing==tp.c.id,tbs.c.id==tb.c.status,tf.c.inblock==tb.c.id, tfs.c.id==tf.status,tft.c.id==tf.type, tpd.c.id==id, order_by=[tb.c.id, tf.c.logical_name], distinct=True) try: res = sel.execute() except Exception, ex: raise DbsDatabaseError(args=ex) result = [] app_result = {} ps_result = {} for row in res.fetchall(): print row return result def createPrimaryDataset(self, dataset): """ Create a new primary dataset. Returns newly created id from t_primary_dataset table. The input object should be a DbsPrimaryDataset with the name set. Raises L{DbsDatabaseError} if a primary dataset already exists in the database, otherwise may raise an L{DbsApiException}. @type dataset: DbsPrimaryDataset @param dataset: a primary dataset tuple @rtype: integer @return: primary dataset id from t_primary_dataset. """ self.printFunc("createPrimaryDataset",dataset) datasetName = dataset['datasetName'] c = select([t_primary_dataset.c.id],and_(t_primary_dataset.c.name==datasetName)).execute() res = c.fetchone() if res: return res[0] session = create_session() transaction = session.create_transaction() try: tp = T_PRIMARY_DATASET(datasetName) session.save(tp) session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return tp.id def createProcessing(self, processing): """ Create a new processing. Instantiates a database entity for the processing, and updates input object for the id of the new row. The input object should be a DbsProcessing duly filled in, with a reference to a primary dataset and application configuration, and optionally a parent. Application-related information is automatically instantiated in the database if it doesn't exist. Raises DbsCgiObjectExists if a primary dataset already exists in the database, DbsCgiNoObject if the primary dataset, or parent if one was specified, doesn't exist in the database, otherwise may raise an DbsApiException. SQL query: select p.id from t_processing p join t_processing_name pn on pn.id = p.name join t_primary_dataset pd on pd.id = p.primary_dataset where pn.name = :processing_name """ self.printFunc("createProcessing",processing) processingName = processing['processingName'] # check if we already have such data tp = t_processing.alias('tp') tpn = t_processing_name.alias('tpn') c = select([tp.c.id],and_(tpn.c.name==processingName,tp.c.name==tpn.c.id)).execute() res = c.fetchone() if res: processing = res[0] return processing # first create primary dataset if it doesn't exists dataset = processing['primaryDataset'] primary_dataset_id = self.createPrimaryDataset(dataset) primaryDatasetName = dataset['datasetName'] family_name = processing['applicationConfig']['application']['family'] exe = processing['applicationConfig']['application']['executable'] ver = processing['applicationConfig']['application']['version'] hash = processing['applicationConfig']['parameterSet']['hash'] content = processing['applicationConfig']['parameterSet']['content'] isOpen = 'n' if processing.has_key('isOpen') and processing['isOpen']: isOpen = 'y' session = create_session() transaction = session.create_transaction() try: taf = T_APP_FAMILY(family_name) tap = T_APPLICATION(exe,ver) taf.t_application.append(tap) tac = T_APP_CONFIG() tps = T_PARAMETER_SET(hash,content) tps.t_app_config.append(tac) tap.t_app_config.append(tac) tpn = T_PROCESSING_NAME(processingName) tpd = session.query(T_PRIMARY_DATASET).get_by(name=primaryDatasetName) tpr = T_PROCESSING(isOpen=isOpen) tpn.t_processing.append(tpr) tpr.primary_dataset=tpd.id tac.t_processing.append(tpr) [session.save(x) for x in [taf,tap,tac,tps,tpn,tpd,tpr]] session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return tpr.id def createProcessedDataset(self, dataset): """ Create a new processed dataset. Instantiates a database entity for the dataset, and updates input object for the id of the new row. The input object should be a DbsProcessedDataset filled in, referring to a DbsPrimaryDataset and having data tier and dataset name set. On return the dataset's id will be updated. Raises DbsCgiObjectExists if the dataset already exists, or DbsCgiNoObject if required path components, the primary dataset and the processing name created through createProcessing(), do not exist in the database; otherwise may raise an DbsApiException. """ self.printFunc("createProcessedDataset",dataset) primary = dataset['primaryDataset'] datasetName = dataset['datasetName'] tier = dataset['dataTier'] primaryDatasetName = primary['datasetName'] processingName = dataset['datasetName'] # processing = dataset['processing'] # family_name = processing['applicationConfig']['application']['family'] # exe = processing['applicationConfig']['application']['executable'] # ver = processing['applicationConfig']['application']['version'] # hash = processing['applicationConfig']['parameterSet']['hash'] # content = processing['applicationConfig']['parameterSet']['content'] # processingName = processing['processingName'] # primaryDatasetName = primary['name'] # isOpen = 'n' # if dataset['isDatasetOpen']: # isOpen = 'y' # first createProcessing if it doesn't exists # proc_id = self.createProcessing(processing) session = create_session() transaction = session.create_transaction() try: tpn = session.query(T_PROCESSING_NAME).get_by(name=processingName) tpd = session.query(T_PRIMARY_DATASET).get_by(name=primaryDatasetName) tdt_id = makeNamed(t_data_tier,tier) try: tprd = session.query(T_PROCESSED_DATASET).get_by(primary_dataset=tpd.id,name=tpn.id,data_tier=tdt_id) transaction.commit() return tprd.id except: pass # tdt = session.query(T_DATA_TIER).get_by(name=tier) # tdt = T_DATA_TIER(tier) tprd = T_PROCESSED_DATASET() # tdt.t_processed_dataset.append(tprd) tprd.primary_dataset=tpd.id # tprd.data_tier=tdt.id tprd.data_tier=tdt_id tprd.name=tpn.id # [session.save(x) for x in [tdt,tprd]] session.save(tprd) session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return tprd.id def insertEventCollections(self, dataset, eventCollectionList): """ Insert event collections to a processed dataset. Instantiates a database row for each element of the event collection list. The objects are *not* updated for database id on return. The dataset should be a DbsProcessedDataset. The event collections should be DbsEventCollection objects fully described, including name and number of events, event collection parentage referring to other DbsEventCollection objects, and the list of files as DbsFile the collections are mapped to. The files must have their logical names names set, and are assumed to already exist in the database. Raises DbsCgiObjectExists if any event collection already exists, or DbsCgiNoObject if the processed dataset, parent collections or the files do not exist in the database; otherwise may raise an DbsApiException. SQL queries: insert into t_event_collection (id, processed_dataset, name, events, status) values (seq_event_collection.nextval, ?, ?, ?, ?)}); insert into t_evcoll_parentage (id, parent, child, type) select seq_evcoll_parentage.nextval, p.id, c.id, ? from t_event_collection p, t_event_collection c where p.name = ? and c.name = ?}); insert into t_evcoll_file (id, evcoll, fileid) select seq_evcoll_file.nextval, e.id, f.id from t_event_collection e, t_file f where e.name = ? and f.logical_name = ?}); """ self.printFunc("insertEventCollections",(dataset, eventCollectionList)) processed_id = self.createProcessedDataset(dataset) session = create_session() transaction = session.create_transaction() try: for evc in eventCollectionList: status = makeNamed(t_evcoll_status,evc['collectionStatus']) tevc = T_EVENT_COLLECTION(processed_id,evc['collectionName'],evc['numberOfEvents'],status) # NOTE: in current DBS API they use insertFiles and insertEventCollection # separetely, which very inefficient since they'll need to invoke select here for # every inserted files # Instead I think would be better to have insertFile, insertFiles and # insertEventCollection for f in evc['fileList']: # file_id = self.insertFile(block,f) tFile = session.query(T_FILE).get_by(logical_name=f['logicalFileName']) tevcf = T_EVCOLL_FILE(iFileId=tFile.id) tevc.t_evcoll_file.append(tevcf) session.save(tevc) session.save(tevcf) # TODO: add parentage insert # for p in enc['parentageList']: # tpt = T_PARENTAGE_TYPE(p['type']) # tevcp = T_EVCOLL_PARENTAGE() session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return def createFileBlock(self, fileBlock): """ Create a new file block. Instantiates a database entity for the block, and updates input object for the id of the new row. The input object should be a DbsFileBlock duly filled in, referring to a DbsProcessing. On successful return the block's id and name will have been updated; the block will be open. Raises DbsCgiNoObject if the processing does not exist in the database, otherwise may raise an DbsApiException. SQL query: select p.id from t_processing p join t_processing_name pn on pn.id = p.name join t_primary_dataset pd on pd.id = p.primary_dataset where pn.name = :processing_name and pd.name = :primary_name insert into t_block values() """ self.printFunc("createFileBlock",fileBlock) processing = fileBlock['processing'] proc_id = self.createProcessing(processing) iStatus = makeNamed(t_block_status,"OPEN") # check if we already have this block tb = t_block.alias('tb') c = select([tb.c.id],and_(tb.c.processing==proc_id,tb.c.status==iStatus)).execute() res = c.fetchone() if res: return res[0] session = create_session() transaction = session.create_transaction() try: tb = T_BLOCK(proc_id,iStatus) session.save(tb) session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return tb.id def insertFiles(self, block, fList): """ Insert files to an existing block. Instantiates a database row for each element of the file list. The objects are *not* updated for database id on return. The block should be a DbsFileBlock. The files should be DbsFile objects fully described, including name, file size, checksum, type and optionally status. Raises DbsCgiObjectExists if any of the files already exists, or DbsCgiNoObject if the block does not exist in the database; otherwise may raise an DbsApiException. SQL queries: select id from t_block where id = :id for update}, ":id" => $blockid)->fetchrow (); insert into t_file (id, logical_name, guid, checksum, filesize, type, status, inblock) values (seq_file.nextval, ?, ?, ?, ?, ?, ?, ?)}); """ self.printFunc("insertFiles",(block,fList)) # iBlock = self.createFileBlock(block) session = create_session() transaction = session.create_transaction() try: for fd in fList: id = self.insertFile(block,fd) # t_file.insert().execute(logical_name=fd['logicalFileName'],guid=fd['guid'], # checksum=fd['checkSum'],filesize=fd['fileSize'],type=fd['fileType'], # inblock=iBlock) except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return def insertFile(self, block, dbsFile): """ Insert a single file to an existing block. The block should be a DbsFileBlock. The file should be DbsFile objects fully described, including name, file size, checksum, type and optionally status. Return newly created file id. Raises DbsCgiObjectExists if any of the files already exists, or DbsCgiNoObject if the block does not exist in the database; otherwise may raise an DbsApiException. SQL queries: select id from t_block where id = :id for update}, ":id" => $blockid)->fetchrow (); insert into t_file (id, logical_name, guid, checksum, filesize, type, status, inblock) values (seq_file.nextval, ?, ?, ?, ?, ?, ?, ?)}); """ self.printFunc("insertFile",(block,dbsFile)) iBlock = self.createFileBlock(block) lName = dbsFile['logicalFileName'] if dbsFile.has_key('guid'): guid = dbsFile['guid'] else: guid = 0 #FIXME chkSum= dbsFile['checkSum'] fSize = dbsFile['fileSize'] fType = dbsFile['fileType'] if dbsFile.has_key('fileStatus'): fStatus = dbsFile['fileStatus'] else: fStatus = "" # check if we already have this file tf = t_file.alias('tf') c = select([tf.c.id],and_(tf.c.logical_name==lName,tf.c.guid==guid,tf.c.checksum==chkSum,tf.c.filesize==fSize,tf.c.type==fType)).execute() res = c.fetchone() if res and res[0]: return res[0] session = create_session() transaction = session.create_transaction() try: fStatus_id = makeNamed(t_file_status,fStatus) fType_id = makeNamed(t_file_type,fType) tf = T_FILE(guid,lName,chkSum,fSize,fStatus_id,fType_id,iBlock) session.save(tf) session.flush() except Exception, ex: transaction.rollback() raise DbsDatabaseError(args=ex) transaction.commit() return tf.id # # main # if __name__ == "__main__": basename = 'test_datasetName' application = {'executable' : 'testexe','version' : 'testversion','family' : 'testfamily'} appConfig= {'application': application, 'parameterSet' : {'hash': 'myhash', 'content': 'mycontent'} } primary = DbsPrimaryDataset (name = basename) dbsApplication = DbsApplication (application = application) dbsProcessing = DbsProcessing (primaryDataset = primary, processingName = "test_process", applicationConfig = appConfig, isOpen=0 ) fileblock = DbsFileBlock (processing = dbsProcessing) dataset = DbsProcessedDataset (primaryDataset=primary, processing=dbsProcessing, datasetName="test_process", dataTier="GEN", datasetPath='/X/Y/Z/', isDatasetOpen=0 ) manager = DBSManager() # test createPrimaryDataset res = manager.createPrimaryDataset(primary) print res res = manager.listPrimaryDatasets() print res # test createProcessedDataset processingId = manager.createProcessing(dbsProcessing) print "### Got processing id: %s" % processingId processedDatasetId = manager.createProcessedDataset(dataset) print "### Got processed dataset id: %s" % processedDatasetId res = manager.listProcessedDatasets() print res # test pattern pattern = '/%s/*/*'%basename res = manager.listProcessedDatasets(pattern) print res res = manager.listApplications() print res res = manager.listApplicationConfigs() print res # test createFileBlock for table in tableList: try: table.drop() table.create() except: raise "Fail to create %s"%table.name processingId = manager.createFileBlock(fileblock) print "### Got fileblock id: %s" % processingId print "+++ Test getDatasetContents" res = manager.getDatasetContents(dataset) print res