EASIEST Way to Download Twitch Clips Using Python Part 2 — Schedule downloads

The Scraper Guy
4 min readAug 8, 2024

--

Yesterdays story can be found here —

Today we will build on that tutorial by scheduling our code to run in intervals and only scraping clips after a streamer has gone live, to prevent downloading the same clips repeatedly.

To begin, we will use Supabase as our DB, I have made plenty of tutorials using Supabase. If you dont have a Supabase account, you can make one. Create an organisation and a new project, the name of which doesnt matter.

Create a new table, mine is called isStreamerLive and has the following columns id (int 8), streamerName (text), streamerID (text), timeScraped (text), dateScraped (text) and isLive (bool).

Start off by testing your supabase connection, add your API Key and Secret Keys and ensure connection is successful

API_URL = '{YOUR API KEY}'
API_KEY = '{YOUR API SECRET KEY}'
supabase = create_client(API_URL, API_KEY)
supabase

You can also add some dummy data into your table and run the below code to ensure you can read the db table correctly. You will also have to implement a couple of RLS policies for reading and writing to the table, you can read more about this here — https://supabase.com/docs/guides/database/postgres/row-level-security

data = supabase.table('IsStreamerLive').select('*').execute()
data

Our first new function will be to check if a given streamer is Live or not.

def checkIfStreamerIsLive(bearerToken,streamerID):
#CHECK IF STREAMER IS LIVE
url = f"https://api.twitch.tv/helix/streams?&user_id={streamerID}"
data = 'client_id={CLIENT_ID}&client_secret={CLIENT_SECRET}&grant_type=client_credentials'
headers = {
"accept": "application/json",
"Authorization": f"Bearer {bearerToken}",
"Client-Id": "{CLIENT ID}"
}
request = requests.get(url, data=data, headers=headers)
request = request.json()
return request

If the streamer is live then data will be returned in the following format-

If the streamer is not live, no data will be returned

We then create a new function checkIfStreamerIsLiveAndPostToDB that take a bearerToken. This function will call checkIfStreamerIsLive and create a new database row with isLive set to True if length of data returned is greater than 0 and isLive set to false if data is empty.

def checkIfStreamerIsLiveAndPostToDB(bearerToken,todaysDate):
todaysDate = getTodaysDate()
for key, value in streamerIDs.items() :
live = checkIfStreamerIsLive(bearerToken,value)
if(len(live['data']) > 0):
data = supabase.table('IsStreamerLive').insert({"streamerName":str(key),
"streamerID": str(value),
"timeScraped": str(todaysTime),
"dateScraped": str(todaysDate),
"isLive": True,
}).execute()
print("Successfully Added New Record")
else:
data = supabase.table('IsStreamerLive').insert({"streamerName":str(key),
"streamerID": str(value),
"timeScraped": str(todaysTime),
"dateScraped": str(todaysDate),
"isLive": False,
}).execute()
print("Successfully Added New Record")

Following on from this our last function is going to be checkIfClipsShouldBeScraped() which queries our database for the 2 most recently scraped rows for that streamer. The logic here is that if the most recent row shows the streamer is not live and the 2nd most recent row shows the streamer was live then we proceed to scrape the clips. This will ensure there are fresh clips to scrape as the streamer was live.

def checkIfClipsShouldBeScraped():
for key, value in streamerIDs.items():
data = supabase.table('IsStreamerLive').select('*').eq("streamerName",key).order("id", desc=True).limit(2).execute()
if(data.data[0]['isLive'] == False and data.data[1]['isLive'] == True):
print(f"Scraping clips for {key}")
getTop5Clips(value,bearerToken,getTodaysDate(),key)
else:
print(f"No Clips to Scrape for {key}")

Now that we have all of the new code created, we will run this code every hour. You can change the time period if you want to increase or decrease the frequency the code is run, but for me an hour seems fine. We will be using the schedule library in python for this task. Alternatively you could run a cron job.

The code will run as shown below -

bearerToken = getBearerToken()
todaysDate = getTodaysDate()
supabase = createSupabaseInstance()


def job():
print("Scraping clips....")
checkIfStreamerIsLiveAndPostToDB(bearerToken,todaysDate)
checkIfClipsShouldBeScraped()

schedule.every().hour.at(":30").do(job)
while True:
schedule.run_pending()
time.sleep(1)

This code is relatively straightforward, if you are curious about how the schedule library works you can find out more here -

Thats it for todays tutorial, leave a clap if you enjoyed and dont hesitate to reach out to me if there are any questions.

Have a good day and happy scraping!

Follow me on X — https://x.com/PaulConish

--

--

The Scraper Guy
The Scraper Guy

Written by The Scraper Guy

Teaching You How to Scrape Using Python LinkTree - https://linktr.ee/scrapingguy

No responses yet