文章目录提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
- 前言
- 一、Python如何自动化?
- 二、使用步骤
- 1.程序结构
- 2.引入库
- 3.Python读取PO数据
- 4.config配置
- 5.models实体-工具代码生成
- 6.SQLAlchemy数据访问DB
- 7.主程序执行
- 总结
前言
提示:这里可以添加本文要记录的大概内容:
老板分配了点杂七杂八的工作,其中有项工作内容是采购买完在SAP里出PO后,会将他们出的PO的单发给各个部门,我们会将出的PO要手动录到另外系统里接着去下面的审批和执行,从Excel,再到系统查,再打开输入,输入完再审批,数据多了可能会录差了,
提示:以下是本篇文章正文内容,下面案例可供参考
一、Python如何自动化?Python自动化既保证了效率,又确保了数据的一致性,而手动填表误差在所难免,经常出现“数据打架”的问题。
工作内容分析:
1、采购PO的单数据分析
2、IT系统的数据分析
3、两块数据结构分析关联性
4、通过Python自动化的可行性
代码如下(示例):
pip install sqlalchemy pip install Flask-SQLAlchemy pip install cx_Oracle pip install sqlacodegen pip install flask-sqlacodegen pip install pandas3.Python读取PO数据
代码如下(示例):
class excelHelp(): def __init__(self): self.url=r"D:MyProjectSunShengNanIT纸档PR审批记录跟踪表220525-1.xlsx" def read(self): try: wb = xlrd.open_workbook(self.url) sht = wb.sheets()[0] aa = sht.name bb = sht.cell_value(1, 1) print(aa, bb) except Exception as e: print('exception!!!' + e) def readbydf(self,epr): #空列表 Temp_List=[] #空字典 #Temp_Dic = {} try: df = pd.read_excel(self.url,dtype={"成本中心":"str","工号":"str","备件编码":"str"},encoding="utf8") data = df[["内部编号","PO","成本中心","工号","负责人","备件编码"]] #默认axis=0即删除包含NaN值的行 newdata = data.dropna() #print(newdata.head(10)) result = newdata[(newdata["负责人"].str.startswith("张龙"))&(newdata["内部编号"]==epr)] #result = newdata[(newdata["负责人"].str.startswith("张龙"))] #df2 = result.reset_index() #print(df2.head(10)) #删除列 #pdf=result.drop(result.columns[0], axis=1) #生成一个namedtuples类型数据 pdf = result.itertuples(index=False,name='') #for row in pdf: #print(row) #转List #t = np.array(result) #Temp_List = t.tolist() # 转换为字典 #Temp_Dic = result.to_dict() #转换为元组列表 Temp_List = list(pdf); except Exception as e: print('exception!!!' + e) return Temp_List4.config配置 5.models实体-工具代码生成 6.SQLAlchemy数据访问DB
代码如下(示例):
class ItexpenseInfDao(object): def __init__(self,app): self.db = SQLAlchemyDB(app) def getItem(self,folio): item = self.db.session.query(ItexpenseInf.folio,ItexpenseInf.epr,ItexpenseInf.po).filter( ItexpenseInf.folio==folio).first() return item def getItemList(self,folio): itemList = self.db.session.query(ItexpenseInf.folio,ItexpenseInf.status,ItexpenseInf.epr,ItexpenseInf.po,ItexpenseInf.pending_step).filter( ItexpenseInf.pending_step=='P5_PUR_PO',ItexpenseInf.folio.like('{0}%'.format(folio))).all() return itemList def getItems(self,epr): itemList = self.db.session.query(ItexpenseInf.folio, ItexpenseInf.status, ItexpenseInf.epr, ItexpenseInf.po, ItexpenseInf.pending_step).filter( ItexpenseInf.epr.like('{0}%'.format(epr))).all() return itemList def updateItem(self,obj): self.db.session.query(ItexpenseInf).filter(ItexpenseInf.folio == obj.folio).update({'po': obj.po}) self.db.session.commit() def addItem(self,obj): self.db.session.add(obj) self.db.session.commit() def updateItem(self,obj): self.db.session.query(ItexpenseInf).filter(ItexpenseInf.folio == obj.folio).delete() self.db.session.commit() def updateInf(self,obj,item): try: self.db.session.query(ItexpenseInf).filter(ItexpenseInf.folio == obj.folio).update({'po': obj.po}) self.db.session.query(ItexpenseItem).filter(ItexpenseItem.folio == item.folio).update({'po': item.po}) self.db.session.commit() except Exception as e: self.db.session.rollback() raise e def update_db_data(self,obj,item): with self.db.auto_commit_db(): self.db.session.query(ItexpenseInf).filter(ItexpenseInf.folio == obj.folio).update({'po': obj.po}) self.db.session.query(ItexpenseItem).filter(ItexpenseItem.folio == item.folio).update({'po': item.po}) class ItexpenseItemDao(object): def __init__(self,app): self.db = SQLAlchemyDB(app) def getItemList(self,epr,folio): itemList = self.db.session.query(ItexpenseItem.folio,ItexpenseItem.item_description,ItexpenseItem.epr,ItexpenseItem.po).filter( ItexpenseItem.epr==epr,ItexpenseItem.folio==folio).all() return itemList def getItems(self,epr,folio): itemList = self.db.session.query(ItexpenseItem.folio, ItexpenseItem.item_description, ItexpenseItem.epr, ItexpenseItem.po).filter( ItexpenseItem.epr.like('{0}%'.format(epr)), ItexpenseItem.folio == folio).all() return itemList def updateItem(self,obj): self.db.session.query(ItexpenseItem).filter(ItexpenseItem.doc_id == obj.doc_id).update({'po': obj.po}) self.db.session.commit() def addItem(self,obj): self.db.session.add(obj) self.db.session.commit() def deleteItem(self,obj): self.db.session.query(ItexpenseItem).filter(ItexpenseItem.doc_id == obj.doc_id).delete() self.db.session.commit()7.主程序执行
自动获取的PO更新到系统里
记录点点滴滴