작은 메모장
5. RSS, FTP, API를 이용한 서비스 자동화 본문
저번 메일 자동화 이어서
import time
import smtplib
import requests
import openpyxl
import schedule
from datetime import datetime
from dotenv import load_dotenv
from bs4 import BeautifulSoup as bs
from email.header import Header
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
now = datetime.now().strftime("%Y-%m-%d")
def mail_sender():
# Insert Your ID Here
load_dotenv()
SECRET_IP = "YOUR ID"
SECRET_PASS = "YOUR PASSWORD"
smtp = smtplib.SMTP("smtp.naver.com", 587)
smtp.ehlo()
smtp.starttls()
smtp.login(SECRET_IP, SECRET_PASS)
myemail = "YOUR EMAIL"
youremail = "TARGET EMAIL"
msg = MIMEMultipart()
# Mail Header
subject = "Sending new Malware Information"
msg["Subject"] = Header(subject.encode("utf-8"), "utf-8")
msg["From"] = myemail
msg["To"] = youremail
message = f"Automatic catch Information"
contentPart = MIMEText(message)
msg.attach(contentPart)
# Mail Content
etc_file_path = f"study/static/{now}_malware.xlsx"
with open(etc_file_path, "rb") as f:
etc_content = MIMEApplication(f.read())
etc_content.add_header(
"Content-Disposition", "attachment", filename=f"{now}_malware.xlsx"
)
msg.attach(etc_content)
smtp.sendmail(myemail, youremail, msg.as_string())
print("Mail Sent")
smtp.quit()
def info_crawler():
workbook = openpyxl.Workbook()
worksheet = workbook.active
worksheet["A1"] = "Title"
worksheet["B1"] = "Link"
# Crawling
target_url = "https://www.malware-traffic-analysis.net/2023/"
headers = {"User-Agent": "Mozilla/5.0", "Content-Type": "text/html; charset=utf-8"}
req = requests.get(target_url + "index.html", headers=headers)
soup = bs(req.text, "lxml")
"""
# Main content : .content > ul
# Indivisual content : [.content > ul] > li > .main_menu
# title : [.content > ul > li > .main_menu] > a
# link : [.content > ul > li > .main_menu] > a.href
"""
tags = soup.select(".content > ul > li > .main_menu")
# File Output
row = 2
for tag in tags:
tag_text = tag.text
tag_href = f"https://www.malware-traffic-analysis.net/2023/{tag['href']}"
worksheet.cell(row=row, column=1, value=tag_text)
worksheet.cell(row=row, column=2, value=tag_href)
row = row + 1
workbook.save(f"study/static/{now}_malware.xlsx")
# Pending Schedule
schedule.every(1).minutes.do(mail_sender)
# Main Run
info_crawler()
while True:
schedule.run_pending()
time.sleep(1)
크롤링과 메일 서비스를 합친 코드
< 설치한 라이브러리 >
pip install schedule
pip install feedparser
pip install pandas
pip install slack_sdk
FTP란?
인터넷의 대표적인 프로토콜 3개만 나열해보라 하면,
HTTP(웹서비스) - HTTPS(보안 강화버전)
FTP(파일전송) - secure FTP(보안 강화버전 )
Telnet(원격접속) - SSH (보안 강화버전 )
여기서 FTP는 File Transport Protocol, 즉 파일전송 프로토콜
말 그대로 파일 전송을 위한 프로토콜로, 실무에서는 이를 로그 전송, 정보 수집등에 활용한다.
import feedparser
import pandas as pd
url = "https://www.dailysecu.com/rss/allArticle.xml"
feed = feedparser.parse(url)
titles = []
links = []
descriptions = []
authors = []
pubDatas = []
for entry in feed.entries:
titles.append(entry.title)
links.append(entry.link)
descriptions.append(entry.description)
authors.append(entry.author)
pubDatas.append(entry.published)
data = {
"Title": titles,
"Link": links,
"Description": descriptions,
"Author": authors,
"Publish": pubDatas,
}
df = pd.DataFrame(data)
df.to_excel("study/static/result.xlsx", index=False)
RSS를 이용하는 코드
import feedparser
import pandas as pd
# Get URL
with open("study/static/rss.txt", "r") as file:
rss_urls = file.readlines()
for index, url in enumerate(rss_urls):
feed = feedparser.parse(url)
titles = []
links = []
descriptions = []
authors = []
pubDatas = []
for entry in feed.entries:
titles.append(entry.title)
links.append(entry.link)
descriptions.append(entry.description)
authors.append(entry.author)
# pubDatas.append(entry.published)
data = {
"Title": titles,
"Link": links,
"Description": descriptions,
"Author": authors,
# "Publish": pubDatas,
}
df = pd.DataFrame(data)
df.to_excel(f"study/static/{index + 1}_result.xlsx", index=False)
여러 RSS 주소를 자동을 불러오는 코드
파일 입출력 사용
실제 FTP를 사용하기 위해 VMware 사용
사용환경은 칼리리눅스, NAT으로 연결
vsftpd로 로컬로 서버를 오픈
import ftplib
storage_path = "study/static/"
ftp_host = "192.168.237.128"
ftp_user = "kali"
ftp_pass = "kali"
ftp = ftplib.FTP(ftp_host)
ftp.login(ftp_user, ftp_pass)
with open(f"{storage_path}1_result.xlsx", "rb") as f:
ftp.storbinary(f"STOR 1_result.xlsx", f)
ftp.dir()
print(f"Current Path Info : {ftp.pwd()}")
로컬 ftp 서버에 간단한 정보 요청 코드
import os
import time
import ftplib
import zipfile
import schedule
import feedparser
import pandas as pd
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d")
content_path = "study/static/"
zip_path = "study/zip_data/"
# Definition
def rss_get():
# Get URL
with open(f"{content_path}rss.txt", "r") as file:
rss_urls = file.readlines()
for index, url in enumerate(rss_urls):
feed = feedparser.parse(url)
titles = []
links = []
descriptions = []
authors = []
pubDatas = []
for entry in feed.entries:
titles.append(entry.title)
links.append(entry.link)
descriptions.append(entry.description)
authors.append(entry.author)
# pubDatas.append(entry.published)
data = {
"Title": titles,
"Link": links,
"Description": descriptions,
"Author": authors,
# "Publish": pubDatas,
}
df = pd.DataFrame(data)
df.to_excel(f"study/static/{index + 1}_result.xlsx", index=False)
def zipper():
zip_file = zipfile.ZipFile(f"{zip_path}{now}static_folder.zip", "w")
for root, dirs, files in os.walk(content_path):
for file in files:
zip_file.write(os.path.join(root, file))
zip_file.close()
def ftp_contact():
ftp_host = "192.168.237.128"
ftp_user = "kali"
ftp_pass = "kali"
ftp = ftplib.FTP(ftp_host)
ftp.login(ftp_user, ftp_pass)
with open(f"{zip_path}{now}static_folder.zip", "rb") as f:
ftp.storbinary(f"STOR static_folder.zip", f)
# Sheduler
def save_on_kali():
zipper()
os.sleep(1)
ftp_contact()
schedule.every(1).days.do(save_on_kali)
if __name__ == "__main__":
rss_get()
zipper()
ftp_contact()
# while True:
# rss_get()
# schedule.run_pending()
# time.sleep(30)
ZIP파일 만들어서 Kali한테 전달
# curl -X POST -H 'Content-type: application/json' --data '{"text":"Hello, World!"}' https://hooks.slack.com/services/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
import requests
import json
slack_url = (
"SLACK URL INPUT"
)
def sendSlackWebhook(strText):
headers = {"Content-type": "application/json"}
data = {"text": strText}
response = requests.post(slack_url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
print("Sending Done")
else:
print("error accur")
sendSlackWebhook("this is a test")
API를 활용하여 slack 채널에 메시지 전송
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
# Slack channel to send the message to
SLACK_API_TOKEN = "BOT API TOKEN"
def sendSlackWebhook(file_path):
client = WebClient(token=SLACK_API_TOKEN)
try:
response = client.files_upload(
channels="#python-test", file=file_path, title=f"테스트입니다."
)
print(f"정상적으로 보냄")
except SlackApiError as e:
print(f"오류 발생 {e}")
output_path = "study/zip_data/static_folder.zip"
sendSlackWebhook(output_path)
API를 활용하여 slack 채널에 파일 전송
'실더스 루키즈 교육' 카테고리의 다른 글
7. CriminalIP API 활용 및 doc 파일 문서 자동화 (0) | 2023.11.29 |
---|---|
6. 파일 속성 리스트와 중요 정보 탐지 (0) | 2023.11.28 |
4. 파일 입출력, 메일 자동화, 웹 서비스 크롤링 (0) | 2023.11.24 |
3. 인프라 활용을 위한 파이썬 (0) | 2023.11.23 |
2. Cloud DataCenter 개론 (0) | 2023.11.22 |