XRootD
Loading...
Searching...
No Matches
XrdPfcCommand.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include "XrdPfcInfo.hh"
20#include "XrdPfc.hh"
21#include "XrdPfcTrace.hh"
22
24#include "XrdOss/XrdOss.hh"
25#include "XrdOuc/XrdOuca2x.hh"
26#include "XrdOuc/XrdOucArgs.hh"
27#include "XrdOuc/XrdOucEnv.hh"
31
32#include <algorithm>
33#include <cstring>
34#include <iostream>
35#include <fcntl.h>
36#include <vector>
37#include <sys/time.h>
38
39using namespace XrdPfc;
40
41//______________________________________________________________________________
42
43const int MAX_ACCESSES = 20;
44
45const long long ONE_MB = 1024ll * 1024;
46const long long ONE_GB = 1024ll * 1024 * 1024;
47
48void Cache::ExecuteCommandUrl(const std::string& command_url)
49{
50 static const char *top_epfx = "ExecuteCommandUrl ";
51
52 SplitParser cp(command_url, "/");
53
54 std::string token = cp.get_token();
55
56 if (token != "xrdpfc_command")
57 {
58 TRACE(Error, top_epfx << "First token is NOT xrdpfc_command.");
59 return;
60 }
61
62 // Get the command
63 token = cp.get_token();
64
65
66 //================================================================
67 // create_file
68 //================================================================
69
70 if (token == "create_file")
71 {
72 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/create_file: ";
73 static const char* usage =
74 "Usage: create_file/ [-h] [-s filesize] [-b blocksize] [-t access_time] [-d access_duration]/<path>\n"
75 " Creates a cache file with given parameters. Data in file is random.\n"
76 " Useful for cache purge testing.\n"
77 "Notes:\n"
78 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n"
79 " . Default filesize=1G, blocksize=<as configured>, access_time=-10, access_duration=10.\n"
80 " . -t and -d can be given multiple times to record several accesses.\n"
81 " . Negative arguments given to -t are interpreted as relative to now.\n";
82
83 const Configuration &conf = m_configuration;
84
85 token = cp.get_token();
86
87 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
88
89 std::vector<char*> argv;
90 SplitParser ap(token, " ");
91 int argc = ap.fill_argv(argv);
92
93 long long file_size = ONE_GB;
94 long long block_size = conf.m_bufferSize;
95 int access_time [MAX_ACCESSES];
96 int access_duration[MAX_ACCESSES];
97 int at_count = 0, ad_count = 0;
98 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
99 "help", 1, "h",
100 "verbose", 1, "v",
101 "size", 1, "s",
102 "blocksize", 1, "b",
103 "time", 1, "t",
104 "duration", 1, "d",
105 (const char *) 0);
106
107 time_t time_now = time(0);
108
109 Spec.Set(argc, &argv[0]);
110 char theOpt;
111
112 while ((theOpt = Spec.getopt()) != (char) -1)
113 {
114 switch (theOpt)
115 {
116 case 'h': {
117 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
118 return;
119 }
120 case 's': {
121 if (XrdOuca2x::a2sz(m_log, "Error getting filesize", Spec.getarg(),
122 &file_size, 0ll, 32 * ONE_GB))
123 return;
124 break;
125 }
126 case 'b': {
127 if (XrdOuca2x::a2sz(m_log, "Error getting blocksize", Spec.getarg(),
128 &block_size, 0ll, 64 * ONE_MB))
129 return;
130 break;
131 }
132 case 't': {
133 if (XrdOuca2x::a2i(m_log, "Error getting access time", Spec.getarg(),
134 &access_time[at_count++], INT_MIN, INT_MAX))
135 return;
136 break;
137 }
138 case 'd': {
139 if (XrdOuca2x::a2i(m_log, "Error getting access duration", Spec.getarg(),
140 &access_duration[ad_count++], 0, 24 * 3600))
141 return;
142 break;
143 }
144 default: {
145 TRACE(Error, err_prefix << "Unhandled command argument.");
146 return;
147 }
148 }
149 }
150 if (Spec.getarg())
151 {
152 TRACE(Error, err_prefix << "Options must take up all the arguments.");
153 return;
154 }
155
156 if (at_count < 1) access_time [at_count++] = time_now - 10;
157 if (ad_count < 1) access_duration[ad_count++] = 10;
158
159 if (at_count != ad_count)
160 {
161 TRACE(Error, err_prefix << "Options -t and -d must be given the same number of times.");
162 return;
163 }
164
165 std::string file_path (cp.get_reminder_with_delim());
166 std::string cinfo_path(file_path + Info::s_infoExtension);
167
168 TRACE(Debug, err_prefix << "Command arguments parsed successfully. Proceeding to create file " << file_path);
169
170 // Check if cinfo exists ... bail out if it does.
171 {
172 struct stat infoStat;
173 if (GetOss()->Stat(cinfo_path.c_str(), &infoStat) == XrdOssOK)
174 {
175 TRACE(Error, err_prefix << "cinfo file already exists for '" << file_path << "'. Refusing to overwrite.");
176 return;
177 }
178 }
179
180 TRACE(Debug, err_prefix << "Command arguments parsed successfully, proceeding to execution.");
181
182 {
183 const char *myUser = conf.m_username.c_str();
184 XrdOucEnv myEnv;
185
186 // Create the data file.
187
188 char size_str[32]; sprintf(size_str, "%lld", file_size);
189 myEnv.Put("oss.asize", size_str);
190 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
191 int cret;
192 if ((cret = GetOss()->Create(myUser, file_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
193 {
194 TRACE(Error, err_prefix << "Create failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
195 return;
196 }
197
198 XrdOssDF *myFile = GetOss()->newFile(myUser);
199 if ((cret = myFile->Open(file_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
200 {
201 TRACE(Error, err_prefix << "Open failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(-cret));
202 delete myFile;
203 return;
204 }
205
206 // Create the info file.
207
208 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
209 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
210 if ((cret = GetOss()->Create(myUser, cinfo_path.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
211 {
212 TRACE(Error, err_prefix << "Create failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
213 myFile->Close(); delete myFile;
214 return;
215 }
216
217 XrdOssDF *myInfoFile = GetOss()->newFile(myUser);
218 if ((cret = myInfoFile->Open(cinfo_path.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
219 {
220 TRACE(Error, err_prefix << "Open failed for info file " << cinfo_path << ", " << ERRNO_AND_ERRSTR(-cret));
221 delete myInfoFile;
222 myFile->Close(); delete myFile;
223 return;
224 }
225
226 // Allocate space for the data file.
227
228 if ((cret = posix_fallocate(myFile->getFD(), 0, file_size)))
229 {
230 TRACE(Error, err_prefix << "posix_fallocate failed for data file " << file_path << ", " << ERRNO_AND_ERRSTR(cret));
231 }
232
233 // Fill up cinfo.
234
235 Info myInfo(m_trace, false);
236 myInfo.SetBufferSizeFileSizeAndCreationTime(block_size, file_size);
237 myInfo.SetAllBitsSynced();
238
239 for (int i = 0; i < at_count; ++i)
240 {
241 time_t att_time = access_time[i] >= 0 ? access_time[i] : time_now + access_time[i];
242
243 myInfo.WriteIOStatSingle(file_size, att_time, att_time + access_duration[i]);
244 }
245
246 myInfo.Write(myInfoFile, cinfo_path.c_str());
247
248 myInfoFile->Close(); delete myInfoFile;
249 myFile->Close(); delete myFile;
250
251 TRACE(Info, err_prefix << "Created file '" << file_path << "', size=" << (file_size>>20) << "MB.");
252
253 {
254 XrdSysCondVarHelper lock(&m_writeQ.condVar);
255
256 m_writeQ.writes_between_purges += file_size;
257 }
258 }
259 }
260
261 //================================================================
262 // remove_file
263 //================================================================
264
265 else if (token == "remove_file")
266 {
267 static const char* err_prefix = "ExecuteCommandUrl: /xrdpfc_command/remove_file: ";
268 static const char* usage =
269 "Usage: remove_file/ [-h] /<path>\n"
270 " Removes given file from the cache unless it is currently open.\n"
271 " Useful for removal of stale files or duplicate files in a caching cluster.\n"
272 "Notes:\n"
273 " . If no options are needed one should still leave a space between / separators, ie., '/ /'\n";
274
275 token = cp.get_token();
276
277 TRACE(Debug, err_prefix << "Entered with argument string '" << token <<"'.");
278
279 std::vector<char*> argv;
280 SplitParser ap(token, " ");
281 int argc = ap.fill_argv(argv);
282
283 XrdOucArgs Spec(&m_log, err_prefix, "hvs:b:t:d:",
284 "help", 1, "h",
285 (const char *) 0);
286
287 Spec.Set(argc, &argv[0]);
288 char theOpt;
289
290 while ((theOpt = Spec.getopt()) != (char) -1)
291 {
292 switch (theOpt)
293 {
294 case 'h': {
295 m_log.Say(err_prefix, " -- printing help, no action will be taken\n", usage);
296 return;
297 }
298 default: {
299 TRACE(Error, err_prefix << "Unhandled command argument.");
300 return;
301 }
302 }
303 }
304 if (Spec.getarg())
305 {
306 TRACE(Error, err_prefix << "Options must take up all the arguments.");
307 return;
308 }
309
310 std::string f_name(cp.get_reminder());
311
312 TRACE(Debug, err_prefix << "file argument '" << f_name << "'.");
313
314 int ret = UnlinkFile(f_name, true);
315
316 TRACE(Info, err_prefix << "returned with status " << ret);
317 }
318
319 //================================================================
320 // unknown command
321 //================================================================
322
323 else
324 {
325 TRACE(Error, top_epfx << "Unknown or empty command '" << token << "'");
326 }
327}
328
329
330//==============================================================================
331// Example python script to use /xrdpfc_command/
332//==============================================================================
333/*
334from XRootD import client
335from XRootD.client.flags import OpenFlags
336
337import sys
338import time
339
340#-------------------------------------------------------------------------------
341
342port = int( sys.argv[1] );
343
344g_srv = "root://localhost:%d/" % port
345g_com = "/xrdpfc_command/create_file/"
346g_dir = "/store/user/matevz/"
347
348#-------------------------------------------------------------------------------
349
350def xxsend(args, file) :
351
352 url = g_srv + g_com + args + g_dir + file
353 print "Opening ", url
354
355 with client.File() as f:
356 status, response = f.open(url, OpenFlags.READ)
357
358 print '%r' % status
359 print '%r' % response
360
361#-------------------------------------------------------------------------------
362
363pfx1 = "AAAA"
364pfx2 = "BBBB"
365
366for i in range(1, 1024 + 1):
367
368 atime = -10000 + i
369
370 xxsend("-s 4g -t %d -d 10" % atime,
371 "%s-%04d" % (pfx1, i))
372
373 time.sleep(0.01)
374
375
376for i in range(1, 512 + 1):
377
378 atime = -5000 + i
379
380 xxsend("-s 4g -t %d -d 10" % atime,
381 "%s-%04d" % (pfx2, i))
382
383 time.sleep(0.01)
384 */
struct stat Stat
Definition XrdCks.cc:49
void usage()
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
const int MAX_ACCESSES
const long long ONE_GB
const long long ONE_MB
#define ERRNO_AND_ERRSTR(err_code)
#define stat(a, b)
Definition XrdPosix.hh:96
bool Debug
bool Create
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual XrdOssDF * newFile(const char *tident)=0
char getopt()
char * getarg()
void Set(char *arglist)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
void ExecuteCommandUrl(const std::string &command_url)
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1171
XrdOss * GetOss() const
Definition XrdPfc.hh:389
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void SetAllBitsSynced()
Mark all blocks as synced to disk.
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
char * get_reminder_with_delim()
Definition XrdPfc.hh:156
char * get_token()
Definition XrdPfc.hh:150
char * get_reminder()
Definition XrdPfc.hh:162
int fill_argv(std::vector< char * > &argv)
Definition XrdPfc.hh:167